diff --git a/db/blob/blob_file_builder.cc b/db/blob/blob_file_builder.cc index 4f84dd360..57f05438c 100644 --- a/db/blob/blob_file_builder.cc +++ b/db/blob/blob_file_builder.cc @@ -31,12 +31,13 @@ BlobFileBuilder::BlobFileBuilder( int job_id, uint32_t column_family_id, const std::string& column_family_name, Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint, + std::vector* blob_file_paths, std::vector* blob_file_additions) : BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, env, fs, immutable_cf_options, mutable_cf_options, file_options, job_id, column_family_id, column_family_name, io_priority, write_hint, - blob_file_additions) {} + blob_file_paths, blob_file_additions) {} BlobFileBuilder::BlobFileBuilder( std::function file_number_generator, Env* env, FileSystem* fs, @@ -45,6 +46,7 @@ BlobFileBuilder::BlobFileBuilder( int job_id, uint32_t column_family_id, const std::string& column_family_name, Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint, + std::vector* blob_file_paths, std::vector* blob_file_additions) : file_number_generator_(std::move(file_number_generator)), env_(env), @@ -59,6 +61,7 @@ BlobFileBuilder::BlobFileBuilder( column_family_name_(column_family_name), io_priority_(io_priority), write_hint_(write_hint), + blob_file_paths_(blob_file_paths), blob_file_additions_(blob_file_additions), blob_count_(0), blob_bytes_(0) { @@ -67,7 +70,10 @@ BlobFileBuilder::BlobFileBuilder( assert(fs_); assert(immutable_cf_options_); assert(file_options_); + assert(blob_file_paths_); + assert(blob_file_paths_->empty()); assert(blob_file_additions_); + assert(blob_file_additions_->empty()); } BlobFileBuilder::~BlobFileBuilder() = default; @@ -145,7 +151,7 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() { assert(immutable_cf_options_); assert(!immutable_cf_options_->cf_paths.empty()); - const std::string blob_file_path = BlobFileName( + std::string blob_file_path = BlobFileName( immutable_cf_options_->cf_paths.front().path, blob_file_number); std::unique_ptr file; @@ -161,6 +167,12 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() { } } + // Note: files get added to blob_file_paths_ right after the open, so they + // can be cleaned up upon failure. Contrast this with blob_file_additions_, + // which only contains successfully written files. + assert(blob_file_paths_); + blob_file_paths_->emplace_back(std::move(blob_file_path)); + assert(file); file->SetIOPriority(io_priority_); file->SetWriteLifeTimeHint(write_hint_); @@ -168,7 +180,7 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() { Statistics* const statistics = immutable_cf_options_->statistics; std::unique_ptr file_writer(new WritableFileWriter( - std::move(file), blob_file_path, *file_options_, env_, + std::move(file), blob_file_paths_->back(), *file_options_, env_, nullptr /*IOTracer*/, statistics, immutable_cf_options_->listeners, immutable_cf_options_->file_checksum_gen_factory)); diff --git a/db/blob/blob_file_builder.h b/db/blob/blob_file_builder.h index 86018ec01..755ab4350 100644 --- a/db/blob/blob_file_builder.h +++ b/db/blob/blob_file_builder.h @@ -36,6 +36,7 @@ class BlobFileBuilder { const std::string& column_family_name, Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint, + std::vector* blob_file_paths, std::vector* blob_file_additions); BlobFileBuilder(std::function file_number_generator, Env* env, @@ -47,6 +48,7 @@ class BlobFileBuilder { const std::string& column_family_name, Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint, + std::vector* blob_file_paths, std::vector* blob_file_additions); BlobFileBuilder(const BlobFileBuilder&) = delete; @@ -79,6 +81,7 @@ class BlobFileBuilder { std::string column_family_name_; Env::IOPriority io_priority_; Env::WriteLifeTimeHint write_hint_; + std::vector* blob_file_paths_; std::vector* blob_file_additions_; std::unique_ptr writer_; uint64_t blob_count_; diff --git a/db/blob/blob_file_builder_test.cc b/db/blob/blob_file_builder_test.cc index e09cc2340..dbcdcb9b0 100644 --- a/db/blob/blob_file_builder_test.cc +++ b/db/blob/blob_file_builder_test.cc @@ -42,17 +42,15 @@ class BlobFileBuilderTest : public testing::Test { protected: BlobFileBuilderTest() : mock_env_(Env::Default()), fs_(&mock_env_) {} - void VerifyBlobFile(const ImmutableCFOptions& immutable_cf_options, - uint64_t blob_file_number, uint32_t column_family_id, + void VerifyBlobFile(uint64_t blob_file_number, + const std::string& blob_file_path, + uint32_t column_family_id, CompressionType blob_compression_type, const std::vector>& expected_key_value_pairs, const std::vector& blob_indexes) { assert(expected_key_value_pairs.size() == blob_indexes.size()); - const std::string blob_file_path = BlobFileName( - immutable_cf_options.cf_paths.front().path, blob_file_number); - std::unique_ptr file; constexpr IODebugContext* dbg = nullptr; ASSERT_OK( @@ -137,12 +135,14 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) { constexpr Env::IOPriority io_priority = Env::IO_HIGH; constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder( - TestFileNumberGenerator(), &mock_env_, &fs_, &immutable_cf_options, - &mutable_cf_options, &file_options_, job_id, column_family_id, - column_family_name, io_priority, write_hint, &blob_file_additions); + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_, + &immutable_cf_options, &mutable_cf_options, + &file_options_, job_id, column_family_id, + column_family_name, io_priority, write_hint, + &blob_file_paths, &blob_file_additions); std::vector> expected_key_value_pairs( number_of_blobs); @@ -168,12 +168,20 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) { ASSERT_OK(builder.Finish()); // Check the metadata generated + constexpr uint64_t blob_file_number = 2; + + ASSERT_EQ(blob_file_paths.size(), 1); + + const std::string& blob_file_path = blob_file_paths[0]; + + ASSERT_EQ(blob_file_path, + BlobFileName(immutable_cf_options.cf_paths.front().path, + blob_file_number)); + ASSERT_EQ(blob_file_additions.size(), 1); const auto& blob_file_addition = blob_file_additions[0]; - constexpr uint64_t blob_file_number = 2; - ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number); ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), number_of_blobs); ASSERT_EQ( @@ -181,7 +189,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) { number_of_blobs * (BlobLogRecord::kHeaderSize + key_size + value_size)); // Verify the contents of the new blob file as well as the blob references - VerifyBlobFile(immutable_cf_options, blob_file_number, column_family_id, + VerifyBlobFile(blob_file_number, blob_file_path, column_family_id, kNoCompression, expected_key_value_pairs, blob_indexes); } @@ -210,12 +218,14 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) { constexpr Env::IOPriority io_priority = Env::IO_HIGH; constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder( - TestFileNumberGenerator(), &mock_env_, &fs_, &immutable_cf_options, - &mutable_cf_options, &file_options_, job_id, column_family_id, - column_family_name, io_priority, write_hint, &blob_file_additions); + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_, + &immutable_cf_options, &mutable_cf_options, + &file_options_, job_id, column_family_id, + column_family_name, io_priority, write_hint, + &blob_file_paths, &blob_file_additions); std::vector> expected_key_value_pairs( number_of_blobs); @@ -241,12 +251,19 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) { ASSERT_OK(builder.Finish()); // Check the metadata generated + ASSERT_EQ(blob_file_paths.size(), number_of_blobs); ASSERT_EQ(blob_file_additions.size(), number_of_blobs); for (size_t i = 0; i < number_of_blobs; ++i) { + const uint64_t blob_file_number = i + 2; + + ASSERT_EQ(blob_file_paths[i], + BlobFileName(immutable_cf_options.cf_paths.front().path, + blob_file_number)); + const auto& blob_file_addition = blob_file_additions[i]; - ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), i + 2); + ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number); ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1); ASSERT_EQ(blob_file_addition.GetTotalBlobBytes(), BlobLogRecord::kHeaderSize + key_size + value_size); @@ -258,8 +275,8 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) { expected_key_value_pairs[i]}; std::vector blob_index{blob_indexes[i]}; - VerifyBlobFile(immutable_cf_options, i + 2, column_family_id, - kNoCompression, expected_key_value_pair, blob_index); + VerifyBlobFile(i + 2, blob_file_paths[i], column_family_id, kNoCompression, + expected_key_value_pair, blob_index); } } @@ -286,12 +303,14 @@ TEST_F(BlobFileBuilderTest, InlinedValues) { constexpr Env::IOPriority io_priority = Env::IO_HIGH; constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder( - TestFileNumberGenerator(), &mock_env_, &fs_, &immutable_cf_options, - &mutable_cf_options, &file_options_, job_id, column_family_id, - column_family_name, io_priority, write_hint, &blob_file_additions); + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_, + &immutable_cf_options, &mutable_cf_options, + &file_options_, job_id, column_family_id, + column_family_name, io_priority, write_hint, + &blob_file_paths, &blob_file_additions); for (size_t i = 0; i < number_of_blobs; ++i) { const std::string key = std::to_string(i); @@ -308,6 +327,7 @@ TEST_F(BlobFileBuilderTest, InlinedValues) { ASSERT_OK(builder.Finish()); // Check the metadata generated + ASSERT_TRUE(blob_file_paths.empty()); ASSERT_TRUE(blob_file_additions.empty()); } @@ -335,12 +355,14 @@ TEST_F(BlobFileBuilderTest, Compression) { constexpr Env::IOPriority io_priority = Env::IO_HIGH; constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder( - TestFileNumberGenerator(), &mock_env_, &fs_, &immutable_cf_options, - &mutable_cf_options, &file_options_, job_id, column_family_id, - column_family_name, io_priority, write_hint, &blob_file_additions); + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_, + &immutable_cf_options, &mutable_cf_options, + &file_options_, job_id, column_family_id, + column_family_name, io_priority, write_hint, + &blob_file_paths, &blob_file_additions); const std::string key("1"); const std::string uncompressed_value(value_size, 'x'); @@ -353,12 +375,20 @@ TEST_F(BlobFileBuilderTest, Compression) { ASSERT_OK(builder.Finish()); // Check the metadata generated + constexpr uint64_t blob_file_number = 2; + + ASSERT_EQ(blob_file_paths.size(), 1); + + const std::string& blob_file_path = blob_file_paths[0]; + + ASSERT_EQ(blob_file_path, + BlobFileName(immutable_cf_options.cf_paths.front().path, + blob_file_number)); + ASSERT_EQ(blob_file_additions.size(), 1); const auto& blob_file_addition = blob_file_additions[0]; - constexpr uint64_t blob_file_number = 2; - ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number); ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1); @@ -381,7 +411,7 @@ TEST_F(BlobFileBuilderTest, Compression) { {key, compressed_value}}; std::vector blob_indexes{blob_index}; - VerifyBlobFile(immutable_cf_options, blob_file_number, column_family_id, + VerifyBlobFile(blob_file_number, blob_file_path, column_family_id, kSnappyCompression, expected_key_value_pairs, blob_indexes); } @@ -407,12 +437,14 @@ TEST_F(BlobFileBuilderTest, CompressionError) { constexpr Env::IOPriority io_priority = Env::IO_HIGH; constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder( - TestFileNumberGenerator(), &mock_env_, &fs_, &immutable_cf_options, - &mutable_cf_options, &file_options_, job_id, column_family_id, - column_family_name, io_priority, write_hint, &blob_file_additions); + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_, + &immutable_cf_options, &mutable_cf_options, + &file_options_, job_id, column_family_id, + column_family_name, io_priority, write_hint, + &blob_file_paths, &blob_file_additions); SyncPoint::GetInstance()->SetCallBack("CompressData:TamperWithReturnValue", [](void* arg) { @@ -430,6 +462,15 @@ TEST_F(BlobFileBuilderTest, CompressionError) { SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); + + constexpr uint64_t blob_file_number = 2; + + ASSERT_EQ(blob_file_paths.size(), 1); + ASSERT_EQ(blob_file_paths[0], + BlobFileName(immutable_cf_options.cf_paths.front().path, + blob_file_number)); + + ASSERT_TRUE(blob_file_additions.empty()); } TEST_F(BlobFileBuilderTest, Checksum) { @@ -473,12 +514,14 @@ TEST_F(BlobFileBuilderTest, Checksum) { constexpr Env::IOPriority io_priority = Env::IO_HIGH; constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder( - TestFileNumberGenerator(), &mock_env_, &fs_, &immutable_cf_options, - &mutable_cf_options, &file_options_, job_id, column_family_id, - column_family_name, io_priority, write_hint, &blob_file_additions); + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_, + &immutable_cf_options, &mutable_cf_options, + &file_options_, job_id, column_family_id, + column_family_name, io_priority, write_hint, + &blob_file_paths, &blob_file_additions); const std::string key("1"); const std::string value("deadbeef"); @@ -491,12 +534,20 @@ TEST_F(BlobFileBuilderTest, Checksum) { ASSERT_OK(builder.Finish()); // Check the metadata generated + constexpr uint64_t blob_file_number = 2; + + ASSERT_EQ(blob_file_paths.size(), 1); + + const std::string& blob_file_path = blob_file_paths[0]; + + ASSERT_EQ(blob_file_path, + BlobFileName(immutable_cf_options.cf_paths.front().path, + blob_file_number)); + ASSERT_EQ(blob_file_additions.size(), 1); const auto& blob_file_addition = blob_file_additions[0]; - constexpr uint64_t blob_file_number = 2; - ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number); ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1); ASSERT_EQ(blob_file_addition.GetTotalBlobBytes(), @@ -509,7 +560,7 @@ TEST_F(BlobFileBuilderTest, Checksum) { {key, value}}; std::vector blob_indexes{blob_index}; - VerifyBlobFile(immutable_cf_options, blob_file_number, column_family_id, + VerifyBlobFile(blob_file_number, blob_file_path, column_family_id, kNoCompression, expected_key_value_pairs, blob_indexes); } @@ -561,13 +612,14 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) { constexpr Env::IOPriority io_priority = Env::IO_HIGH; constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + std::vector blob_file_paths; std::vector blob_file_additions; BlobFileBuilder builder(TestFileNumberGenerator(), &fault_injection_env_, &fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, - &blob_file_additions); + &blob_file_paths, &blob_file_additions); SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { fault_injection_env_.SetFilesystemActive(false, @@ -584,6 +636,19 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) { SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); + + if (sync_point_ == "BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile") { + ASSERT_TRUE(blob_file_paths.empty()); + } else { + constexpr uint64_t blob_file_number = 2; + + ASSERT_EQ(blob_file_paths.size(), 1); + ASSERT_EQ(blob_file_paths[0], + BlobFileName(immutable_cf_options.cf_paths.front().path, + blob_file_number)); + } + + ASSERT_TRUE(blob_file_additions.empty()); } } // namespace ROCKSDB_NAMESPACE diff --git a/db/builder.cc b/db/builder.cc index 9dfdf4217..a8bfff264 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -13,6 +13,7 @@ #include #include +#include "db/blob/blob_file_builder.h" #include "db/compaction/compaction_iterator.h" #include "db/dbformat.h" #include "db/event_helpers.h" @@ -67,13 +68,14 @@ TableBuilder* NewTableBuilder( } Status BuildTable( - const std::string& dbname, Env* env, FileSystem* fs, + const std::string& dbname, VersionSet* versions, Env* env, FileSystem* fs, const ImmutableCFOptions& ioptions, const MutableCFOptions& mutable_cf_options, const FileOptions& file_options, TableCache* table_cache, InternalIterator* iter, std::vector> range_del_iters, - FileMetaData* meta, const InternalKeyComparator& internal_comparator, + FileMetaData* meta, std::vector* blob_file_additions, + const InternalKeyComparator& internal_comparator, const std::vector>* int_tbl_prop_collector_factories, uint32_t column_family_id, const std::string& column_family_name, @@ -107,6 +109,7 @@ Status BuildTable( std::string fname = TableFileName(ioptions.cf_paths, meta->fd.GetNumber(), meta->fd.GetPathId()); + std::vector blob_file_paths; std::string file_checksum = kUnknownFileChecksum; std::string file_checksum_func_name = kUnknownFileChecksumFuncName; #ifndef ROCKSDB_LITE @@ -163,11 +166,22 @@ Status BuildTable( snapshots.empty() ? 0 : snapshots.back(), snapshot_checker); + std::unique_ptr blob_file_builder( + (mutable_cf_options.enable_blob_files && blob_file_additions) + ? new BlobFileBuilder(versions, env, fs, &ioptions, + &mutable_cf_options, &file_options, job_id, + column_family_id, column_family_name, + io_priority, write_hint, &blob_file_paths, + blob_file_additions) + : nullptr); + CompactionIterator c_iter( iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber, &snapshots, earliest_write_conflict_snapshot, snapshot_checker, env, ShouldReportDetailedTime(env, ioptions.statistics), - true /* internal key corruption is not ok */, range_del_agg.get()); + true /* internal key corruption is not ok */, range_del_agg.get(), + blob_file_builder.get()); + c_iter.SeekToFirst(); for (; c_iter.Valid(); c_iter.Next()) { const Slice& key = c_iter.key(); @@ -200,9 +214,16 @@ Status BuildTable( } // Finish and check for builder errors - bool empty = builder->IsEmpty(); s = c_iter.status(); + + if (blob_file_builder) { + if (s.ok()) { + s = blob_file_builder->Finish(); + } + } + TEST_SYNC_POINT("BuildTable:BeforeFinishBuildTable"); + const bool empty = builder->IsEmpty(); if (!s.ok() || empty) { builder->Abandon(); } else { @@ -290,7 +311,19 @@ Status BuildTable( } if (!s.ok() || meta->fd.GetFileSize() == 0) { - fs->DeleteFile(fname, IOOptions(), nullptr); + constexpr IODebugContext* dbg = nullptr; + + fs->DeleteFile(fname, IOOptions(), dbg); + + assert(blob_file_additions || blob_file_paths.empty()); + + if (blob_file_additions) { + for (const std::string& blob_file_path : blob_file_paths) { + fs->DeleteFile(blob_file_path, IOOptions(), dbg); + } + + blob_file_additions->clear(); + } } if (meta->fd.GetFileSize() == 0) { diff --git a/db/builder.h b/db/builder.h index bcabb7935..8c80c6379 100644 --- a/db/builder.h +++ b/db/builder.h @@ -27,8 +27,10 @@ namespace ROCKSDB_NAMESPACE { struct Options; struct FileMetaData; +class VersionSet; class Env; struct EnvOptions; +class BlobFileAddition; class Iterator; class SnapshotChecker; class TableCache; @@ -63,13 +65,14 @@ TableBuilder* NewTableBuilder( // @param column_family_name Name of the column family that is also identified // by column_family_id, or empty string if unknown. extern Status BuildTable( - const std::string& dbname, Env* env, FileSystem* fs, + const std::string& dbname, VersionSet* versions, Env* env, FileSystem* fs, const ImmutableCFOptions& options, const MutableCFOptions& mutable_cf_options, const FileOptions& file_options, TableCache* table_cache, InternalIterator* iter, std::vector> range_del_iters, - FileMetaData* meta, const InternalKeyComparator& internal_comparator, + FileMetaData* meta, std::vector* blob_file_additions, + const InternalKeyComparator& internal_comparator, const std::vector>* int_tbl_prop_collector_factories, uint32_t column_family_id, const std::string& column_family_name, diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 9076a256a..7f249632f 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -3,9 +3,11 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +#include "db/compaction/compaction_iterator.h" + #include -#include "db/compaction/compaction_iterator.h" +#include "db/blob/blob_file_builder.h" #include "db/snapshot_checker.h" #include "port/likely.h" #include "rocksdb/listener.h" @@ -36,7 +38,8 @@ CompactionIterator::CompactionIterator( SequenceNumber earliest_write_conflict_snapshot, const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, - CompactionRangeDelAggregator* range_del_agg, const Compaction* compaction, + CompactionRangeDelAggregator* range_del_agg, + BlobFileBuilder* blob_file_builder, const Compaction* compaction, const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, @@ -46,6 +49,7 @@ CompactionIterator::CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, earliest_write_conflict_snapshot, snapshot_checker, env, report_detailed_time, expect_valid_internal_key, range_del_agg, + blob_file_builder, std::unique_ptr( compaction ? new CompactionProxy(compaction) : nullptr), compaction_filter, shutting_down, preserve_deletes_seqnum, @@ -58,6 +62,7 @@ CompactionIterator::CompactionIterator( const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, + BlobFileBuilder* blob_file_builder, std::unique_ptr compaction, const CompactionFilter* compaction_filter, const std::atomic* shutting_down, @@ -74,6 +79,7 @@ CompactionIterator::CompactionIterator( report_detailed_time_(report_detailed_time), expect_valid_internal_key_(expect_valid_internal_key), range_del_agg_(range_del_agg), + blob_file_builder_(blob_file_builder), compaction_(std::move(compaction)), compaction_filter_(compaction_filter), shutting_down_(shutting_down), @@ -661,20 +667,37 @@ void CompactionIterator::NextFromInput() { void CompactionIterator::PrepareOutput() { if (valid_) { - if (compaction_filter_ && ikey_.type == kTypeBlobIndex) { - const auto blob_decision = compaction_filter_->PrepareBlobOutput( - user_key(), value_, &compaction_filter_value_); - - if (blob_decision == CompactionFilter::BlobDecision::kCorruption) { - status_ = Status::Corruption( - "Corrupted blob reference encountered during GC"); - valid_ = false; - } else if (blob_decision == CompactionFilter::BlobDecision::kIOError) { - status_ = Status::IOError("Could not relocate blob during GC"); - valid_ = false; - } else if (blob_decision == - CompactionFilter::BlobDecision::kChangeValue) { - value_ = compaction_filter_value_; + if (ikey_.type == kTypeValue) { + if (blob_file_builder_) { + blob_index_.clear(); + const Status s = + blob_file_builder_->Add(user_key(), value_, &blob_index_); + + if (!s.ok()) { + status_ = s; + valid_ = false; + } else if (!blob_index_.empty()) { + value_ = blob_index_; + ikey_.type = kTypeBlobIndex; + current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + } + } + } else if (ikey_.type == kTypeBlobIndex) { + if (compaction_filter_) { + const auto blob_decision = compaction_filter_->PrepareBlobOutput( + user_key(), value_, &compaction_filter_value_); + + if (blob_decision == CompactionFilter::BlobDecision::kCorruption) { + status_ = Status::Corruption( + "Corrupted blob reference encountered during GC"); + valid_ = false; + } else if (blob_decision == CompactionFilter::BlobDecision::kIOError) { + status_ = Status::IOError("Could not relocate blob during GC"); + valid_ = false; + } else if (blob_decision == + CompactionFilter::BlobDecision::kChangeValue) { + value_ = compaction_filter_value_; + } } } diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index dc7984180..e5b1cc8b1 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -21,6 +21,8 @@ namespace ROCKSDB_NAMESPACE { +class BlobFileBuilder; + class CompactionIterator { public: // A wrapper around Compaction. Has a much smaller interface, only what @@ -66,6 +68,7 @@ class CompactionIterator { const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, + BlobFileBuilder* blob_file_builder, const Compaction* compaction = nullptr, const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, @@ -81,6 +84,7 @@ class CompactionIterator { const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, + BlobFileBuilder* blob_file_builder, std::unique_ptr compaction, const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, @@ -163,6 +167,7 @@ class CompactionIterator { bool report_detailed_time_; bool expect_valid_internal_key_; CompactionRangeDelAggregator* range_del_agg_; + BlobFileBuilder* blob_file_builder_; std::unique_ptr compaction_; const CompactionFilter* compaction_filter_; const std::atomic* shutting_down_; @@ -211,6 +216,7 @@ class CompactionIterator { // PinnedIteratorsManager used to pin input_ Iterator blocks while reading // merge operands and then releasing them after consuming them. PinnedIteratorsManager pinned_iters_mgr_; + std::string blob_index_; std::string compaction_filter_value_; InternalKey compaction_filter_skip_until_; // "level_ptrs" holds indices that remember which file of an associated diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index 76a207e00..c134013a6 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -258,7 +258,8 @@ class CompactionIteratorTest : public testing::TestWithParam { iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_, earliest_write_conflict_snapshot, snapshot_checker_.get(), Env::Default(), false /* report_detailed_time */, false, - range_del_agg_.get(), std::move(compaction), filter, &shutting_down_)); + range_del_agg_.get(), nullptr /* blob_file_builder */, + std::move(compaction), filter, &shutting_down_)); } void AddSnapshot(SequenceNumber snapshot, diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 603a0c981..672d40b96 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -914,9 +914,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { &existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), /*expect_valid_internal_key=*/true, &range_del_agg, - sub_compact->compaction, compaction_filter, shutting_down_, - preserve_deletes_seqnum_, manual_compaction_paused_, - db_options_.info_log)); + /* blob_file_builder */ nullptr, sub_compact->compaction, + compaction_filter, shutting_down_, preserve_deletes_seqnum_, + manual_compaction_paused_, db_options_.info_log)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 81c0a478d..529acbc51 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -11,6 +11,7 @@ #include "db/db_impl/db_impl.h" #include "db/db_test_util.h" +#include "file/filename.h" #include "port/port.h" #include "port/stack_trace.h" #include "test_util/sync_point.h" @@ -443,6 +444,169 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) { } #endif // !ROCKSDB_LITE +TEST_F(DBFlushTest, FlushWithBlob) { + constexpr uint64_t min_blob_size = 10; + + Options options; + options.enable_blob_files = true; + options.min_blob_size = min_blob_size; + options.disable_auto_compactions = true; + + Reopen(options); + + constexpr char short_value[] = "short"; + static_assert(sizeof(short_value) - 1 < min_blob_size, + "short_value too long"); + + constexpr char long_value[] = "long_value"; + static_assert(sizeof(long_value) - 1 >= min_blob_size, + "long_value too short"); + + ASSERT_OK(Put("key1", short_value)); + ASSERT_OK(Put("key2", long_value)); + + ASSERT_OK(Flush()); + + ASSERT_EQ(Get("key1"), short_value); + + // TODO: enable once Get support is implemented for blobs + // ASSERT_EQ(Get("key2"), long_value); + + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + Version* const current = cfd->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + + const auto& l0_files = storage_info->LevelFiles(0); + ASSERT_EQ(l0_files.size(), 1); + + const FileMetaData* const table_file = l0_files[0]; + assert(table_file); + + const auto& blob_files = storage_info->GetBlobFiles(); + ASSERT_EQ(blob_files.size(), 1); + + const auto& blob_file = blob_files.begin()->second; + assert(blob_file); + + ASSERT_EQ(table_file->smallest.user_key(), "key1"); + ASSERT_EQ(table_file->largest.user_key(), "key2"); + ASSERT_EQ(table_file->fd.smallest_seqno, 1); + ASSERT_EQ(table_file->fd.largest_seqno, 2); + ASSERT_EQ(table_file->oldest_blob_file_number, + blob_file->GetBlobFileNumber()); + + ASSERT_EQ(blob_file->GetTotalBlobCount(), 1); + +#ifndef ROCKSDB_LITE + const InternalStats* const internal_stats = cfd->internal_stats(); + assert(internal_stats); + + const uint64_t expected_bytes = + table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes(); + + const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); + ASSERT_FALSE(compaction_stats.empty()); + ASSERT_EQ(compaction_stats[0].bytes_written, expected_bytes); + ASSERT_EQ(compaction_stats[0].num_output_files, 2); + + const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue(); + ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], expected_bytes); +#endif // ROCKSDB_LITE +} + +class DBFlushTestBlobError : public DBFlushTest, + public testing::WithParamInterface { + public: + DBFlushTestBlobError() : fault_injection_env_(env_) {} + ~DBFlushTestBlobError() { Close(); } + + FaultInjectionTestEnv fault_injection_env_; +}; + +INSTANTIATE_TEST_CASE_P(DBFlushTestBlobError, DBFlushTestBlobError, + ::testing::ValuesIn(std::vector{ + "BlobFileBuilder::WriteBlobToFile:AddRecord", + "BlobFileBuilder::WriteBlobToFile:AppendFooter"})); + +TEST_P(DBFlushTestBlobError, FlushError) { + Options options; + options.enable_blob_files = true; + options.disable_auto_compactions = true; + options.env = &fault_injection_env_; + + Reopen(options); + + ASSERT_OK(Put("key", "blob")); + + SyncPoint::GetInstance()->SetCallBack(GetParam(), [this](void* /* arg */) { + fault_injection_env_.SetFilesystemActive(false, Status::IOError()); + }); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:BeforeFinishBuildTable", [this](void* /* arg */) { + fault_injection_env_.SetFilesystemActive(true); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_NOK(Flush()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + Version* const current = cfd->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + + const auto& l0_files = storage_info->LevelFiles(0); + ASSERT_TRUE(l0_files.empty()); + + const auto& blob_files = storage_info->GetBlobFiles(); + ASSERT_TRUE(blob_files.empty()); + + // Make sure the files generated by the failed job have been deleted + std::vector files; + ASSERT_OK(env_->GetChildren(dbname_, &files)); + for (const auto& file : files) { + uint64_t number = 0; + FileType type = kTableFile; + + if (!ParseFileName(file, &number, &type)) { + continue; + } + + ASSERT_NE(type, kTableFile); + ASSERT_NE(type, kBlobFile); + } + +#ifndef ROCKSDB_LITE + const InternalStats* const internal_stats = cfd->internal_stats(); + assert(internal_stats); + + const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); + ASSERT_FALSE(compaction_stats.empty()); + ASSERT_EQ(compaction_stats[0].bytes_written, 0); + ASSERT_EQ(compaction_stats[0].num_output_files, 0); + + const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue(); + ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], 0); +#endif // ROCKSDB_LITE +} + TEST_P(DBAtomicFlushTest, ManualAtomicFlush) { Options options = CurrentOptions(); options.create_if_missing = true; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index d5dea4323..fe034a493 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -142,6 +142,7 @@ Status DBImpl::FlushMemTableToOutputFile( SnapshotChecker* snapshot_checker, LogBuffer* log_buffer, Env::Priority thread_pri) { mutex_.AssertHeld(); + assert(cfd); assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->IsFlushPending()); @@ -203,10 +204,28 @@ Status DBImpl::FlushMemTableToOutputFile( if (made_progress) { *made_progress = true; } + + const std::string& column_family_name = cfd->GetName(); + + Version* const current = cfd->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + VersionStorageInfo::LevelSummaryStorage tmp; ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", - cfd->GetName().c_str(), - cfd->current()->storage_info()->LevelSummary(&tmp)); + column_family_name.c_str(), + storage_info->LevelSummary(&tmp)); + + const auto& blob_files = storage_info->GetBlobFiles(); + if (!blob_files.empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 + "\n", + column_family_name.c_str(), blob_files.begin()->first, + blob_files.rbegin()->first); + } } if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) { @@ -544,16 +563,36 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( assert(num_cfs == static_cast(job_context->superversion_contexts.size())); for (int i = 0; i != num_cfs; ++i) { + assert(cfds[i]); + if (cfds[i]->IsDropped()) { continue; } InstallSuperVersionAndScheduleWork(cfds[i], &job_context->superversion_contexts[i], all_mutable_cf_options[i]); + + const std::string& column_family_name = cfds[i]->GetName(); + + Version* const current = cfds[i]->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + VersionStorageInfo::LevelSummaryStorage tmp; ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", - cfds[i]->GetName().c_str(), - cfds[i]->current()->storage_info()->LevelSummary(&tmp)); + column_family_name.c_str(), + storage_info->LevelSummary(&tmp)); + + const auto& blob_files = storage_info->GetBlobFiles(); + if (!blob_files.empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Blob file summary: head=%" PRIu64 + ", tail=%" PRIu64 "\n", + column_family_name.c_str(), blob_files.begin()->first, + blob_files.rbegin()->first); + } } if (made_progress) { *made_progress = true; diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 72b6a8ef9..ea446037d 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1272,7 +1272,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, MemTable* mem, VersionEdit* edit) { mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); + FileMetaData meta; + std::unique_ptr::iterator> pending_outputs_inserted_elem( new std::list::iterator( CaptureCurrentFileNumberInPendingOutputs())); @@ -1319,11 +1321,13 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, if (range_del_iter != nullptr) { range_del_iters.emplace_back(range_del_iter); } + IOStatus io_s; s = BuildTable( - dbname_, env_, fs_.get(), *cfd->ioptions(), mutable_cf_options, - file_options_for_compaction_, cfd->table_cache(), iter.get(), - std::move(range_del_iters), &meta, cfd->internal_comparator(), + dbname_, versions_.get(), env_, fs_.get(), *cfd->ioptions(), + mutable_cf_options, file_options_for_compaction_, cfd->table_cache(), + iter.get(), std::move(range_del_iters), &meta, + nullptr /* blob_file_additions */, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), diff --git a/db/flush_job.cc b/db/flush_job.cc index 088c06723..6b0b3497e 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -266,6 +266,13 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, stream << vstorage->NumLevelFiles(level); } stream.EndArray(); + + const auto& blob_files = vstorage->GetBlobFiles(); + if (!blob_files.empty()) { + stream << "blob_file_head" << blob_files.begin()->first; + stream << "blob_file_tail" << blob_files.rbegin()->first; + } + stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed(); if (measure_io_stats_) { @@ -300,6 +307,9 @@ Status FlushJob::WriteLevel0Table() { const uint64_t start_micros = db_options_.env->NowMicros(); const uint64_t start_cpu_micros = db_options_.env->NowCPUNanos() / 1000; Status s; + + std::vector blob_file_additions; + { auto write_hint = cfd_->CalculateSSTWriteHint(0); db_mutex_->Unlock(); @@ -388,9 +398,10 @@ Status FlushJob::WriteLevel0Table() { IOStatus io_s; s = BuildTable( - dbname_, db_options_.env, db_options_.fs.get(), *cfd_->ioptions(), - mutable_cf_options_, file_options_, cfd_->table_cache(), iter.get(), - std::move(range_del_iters), &meta_, cfd_->internal_comparator(), + dbname_, versions_, db_options_.env, db_options_.fs.get(), + *cfd_->ioptions(), mutable_cf_options_, file_options_, + cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_, + &blob_file_additions, cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), cfd_->GetName(), existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, @@ -425,7 +436,10 @@ Status FlushJob::WriteLevel0Table() { // Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. - if (s.ok() && meta_.fd.GetFileSize() > 0) { + const bool has_output = meta_.fd.GetFileSize() > 0; + assert(has_output || blob_file_additions.empty()); + + if (s.ok() && has_output) { // if we have more than 1 background thread, then we cannot // insert files directly into higher levels because some other // threads could be concurrently producing compacted files for @@ -437,6 +451,8 @@ Status FlushJob::WriteLevel0Table() { meta_.marked_for_compaction, meta_.oldest_blob_file_number, meta_.oldest_ancester_time, meta_.file_creation_time, meta_.file_checksum, meta_.file_checksum_func_name); + + edit_->SetBlobFileAdditions(std::move(blob_file_additions)); } #ifndef ROCKSDB_LITE // Piggyback FlushJobInfo on the first first flushed memtable. @@ -447,11 +463,22 @@ Status FlushJob::WriteLevel0Table() { InternalStats::CompactionStats stats(CompactionReason::kFlush, 1); stats.micros = db_options_.env->NowMicros() - start_micros; stats.cpu_micros = db_options_.env->NowCPUNanos() / 1000 - start_cpu_micros; - stats.bytes_written = meta_.fd.GetFileSize(); + + if (has_output) { + stats.bytes_written = meta_.fd.GetFileSize(); + + const auto& blobs = edit_->GetBlobFileAdditions(); + for (const auto& blob : blobs) { + stats.bytes_written += blob.GetTotalBlobBytes(); + } + + stats.num_output_files = static_cast(blobs.size()) + 1; + } + RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros); cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats); cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, - meta_.fd.GetFileSize()); + stats.bytes_written); RecordFlushIOStats(); return s; } diff --git a/db/internal_stats.h b/db/internal_stats.h index ce83be244..1e2de502a 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -392,6 +392,8 @@ class InternalStats { bool GetIntPropertyOutOfMutex(const DBPropertyInfo& property_info, Version* version, uint64_t* value); + const uint64_t* TEST_GetCFStatsValue() const { return cf_stats_value_; } + const std::vector& TEST_GetCompactionStats() const { return comp_stats_; } diff --git a/db/memtable_list.cc b/db/memtable_list.cc index e2db297e1..dced9f7db 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -445,9 +445,18 @@ Status MemTableList::TryInstallMemtableFlushResults( } if (it == memlist.rbegin() || batch_file_number != m->file_number_) { batch_file_number = m->file_number_; - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Level-0 commit table #%" PRIu64 " started", - cfd->GetName().c_str(), m->file_number_); + if (m->edit_.GetBlobFileAdditions().empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 " started", + cfd->GetName().c_str(), m->file_number_); + } else { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + " (+%zu blob files) started", + cfd->GetName().c_str(), m->file_number_, + m->edit_.GetBlobFileAdditions().size()); + } + edit_list.push_back(&m->edit_); memtables_to_flush.push_back(m); #ifndef ROCKSDB_LITE @@ -502,9 +511,20 @@ Status MemTableList::TryInstallMemtableFlushResults( if (s.ok() && !cfd->IsDropped()) { // commit new state while (batch_count-- > 0) { MemTable* m = current_->memlist_.back(); - ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64 - ": memtable #%" PRIu64 " done", - cfd->GetName().c_str(), m->file_number_, mem_id); + if (m->edit_.GetBlobFileAdditions().empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " done", + cfd->GetName().c_str(), m->file_number_, mem_id); + } else { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + " (+%zu blob files)" + ": memtable #%" PRIu64 " done", + cfd->GetName().c_str(), m->file_number_, + m->edit_.GetBlobFileAdditions().size(), mem_id); + } + assert(m->file_number_ > 0); current_->Remove(m, to_delete); UpdateCachedValuesFromMemTableListVersion(); @@ -515,9 +535,20 @@ Status MemTableList::TryInstallMemtableFlushResults( for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) { MemTable* m = *it; // commit failed. setup state so that we can flush again. - ROCKS_LOG_BUFFER(log_buffer, "Level-0 commit table #%" PRIu64 - ": memtable #%" PRIu64 " failed", - m->file_number_, mem_id); + if (m->edit_.GetBlobFileAdditions().empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " failed", + m->file_number_, mem_id); + } else { + ROCKS_LOG_BUFFER(log_buffer, + "Level-0 commit table #%" PRIu64 + " (+%zu blob files)" + ": memtable #%" PRIu64 " failed", + m->file_number_, + m->edit_.GetBlobFileAdditions().size(), mem_id); + } + m->flush_completed_ = false; m->flush_in_progress_ = false; m->edit_.Clear(); @@ -713,11 +744,25 @@ Status InstallMemtableAtomicFlushResults( for (auto m : *mems_list[i]) { assert(m->GetFileNumber() > 0); uint64_t mem_id = m->GetID(); - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Level-0 commit table #%" PRIu64 - ": memtable #%" PRIu64 " done", - cfds[i]->GetName().c_str(), m->GetFileNumber(), - mem_id); + + const VersionEdit* const edit = m->GetEdits(); + assert(edit); + + if (edit->GetBlobFileAdditions().empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " done", + cfds[i]->GetName().c_str(), m->GetFileNumber(), + mem_id); + } else { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + " (+%zu blob files)" + ": memtable #%" PRIu64 " done", + cfds[i]->GetName().c_str(), m->GetFileNumber(), + edit->GetBlobFileAdditions().size(), mem_id); + } + imm->current_->Remove(m, to_delete); imm->UpdateCachedValuesFromMemTableListVersion(); imm->ResetTrimHistoryNeeded(); @@ -728,11 +773,25 @@ Status InstallMemtableAtomicFlushResults( auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i); for (auto m : *mems_list[i]) { uint64_t mem_id = m->GetID(); - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Level-0 commit table #%" PRIu64 - ": memtable #%" PRIu64 " failed", - cfds[i]->GetName().c_str(), m->GetFileNumber(), - mem_id); + + const VersionEdit* const edit = m->GetEdits(); + assert(edit); + + if (edit->GetBlobFileAdditions().empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " failed", + cfds[i]->GetName().c_str(), m->GetFileNumber(), + mem_id); + } else { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + " (+%zu blob files)" + ": memtable #%" PRIu64 " failed", + cfds[i]->GetName().c_str(), m->GetFileNumber(), + edit->GetBlobFileAdditions().size(), mem_id); + } + m->SetFlushCompleted(false); m->SetFlushInProgress(false); m->GetEdits()->Clear(); diff --git a/db/repair.cc b/db/repair.cc index 259892698..a96af2c3e 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -429,18 +429,19 @@ class Repairer { LegacyFileSystemWrapper fs(env_); IOStatus io_s; status = BuildTable( - dbname_, env_, &fs, *cfd->ioptions(), + dbname_, /* versions */ nullptr, env_, &fs, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), env_options_, table_cache_, iter.get(), std::move(range_del_iters), &meta, - cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), - cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber, - snapshot_checker, kNoCompression, 0 /* sample_for_compression */, - CompressionOptions(), false, nullptr /* internal_stats */, - TableFileCreationReason::kRecovery, &io_s, nullptr /*IOTracer*/, - nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH, - nullptr /* table_properties */, -1 /* level */, current_time, - 0 /* oldest_key_time */, write_hint, 0 /* file_creation_time */, - "DB Repairer" /* db_id */, db_session_id_); + nullptr /* blob_file_additions */, cfd->internal_comparator(), + cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), + {}, kMaxSequenceNumber, snapshot_checker, kNoCompression, + 0 /* sample_for_compression */, CompressionOptions(), false, + nullptr /* internal_stats */, TableFileCreationReason::kRecovery, + &io_s, nullptr /*IOTracer*/, nullptr /* event_logger */, + 0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */, + -1 /* level */, current_time, 0 /* oldest_key_time */, write_hint, + 0 /* file_creation_time */, "DB Repairer" /* db_id */, + db_session_id_); ROCKS_LOG_INFO(db_options_.info_log, "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log, counter, meta.fd.GetNumber(), diff --git a/db/version_edit.cc b/db/version_edit.cc index f27b3eaab..8879f0e1b 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -543,7 +543,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) { return s; } - blob_file_additions_.emplace_back(blob_file_addition); + AddBlobFile(std::move(blob_file_addition)); break; } @@ -554,7 +554,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) { return s; } - blob_file_garbages_.emplace_back(blob_file_garbage); + AddBlobFileGarbage(std::move(blob_file_garbage)); break; } diff --git a/db/version_edit.h b/db/version_edit.h index a84e3de09..d2d92c272 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -414,12 +414,20 @@ class VersionEdit { std::move(checksum_method), std::move(checksum_value)); } + void AddBlobFile(BlobFileAddition blob_file_addition) { + blob_file_additions_.emplace_back(std::move(blob_file_addition)); + } + // Retrieve all the blob files added. using BlobFileAdditions = std::vector; const BlobFileAdditions& GetBlobFileAdditions() const { return blob_file_additions_; } + void SetBlobFileAdditions(BlobFileAdditions blob_file_additions) { + blob_file_additions_ = std::move(blob_file_additions); + } + // Add garbage for an existing blob file. Note: intentionally broken English // follows. void AddBlobFileGarbage(uint64_t blob_file_number, @@ -429,12 +437,20 @@ class VersionEdit { garbage_blob_bytes); } + void AddBlobFileGarbage(BlobFileGarbage blob_file_garbage) { + blob_file_garbages_.emplace_back(std::move(blob_file_garbage)); + } + // Retrieve all the blob file garbage added. using BlobFileGarbages = std::vector; const BlobFileGarbages& GetBlobFileGarbages() const { return blob_file_garbages_; } + void SetBlobFileGarbages(BlobFileGarbages blob_file_garbages) { + blob_file_garbages_ = std::move(blob_file_garbages); + } + // Add a WAL (either just created or closed). void AddWal(WalNumber number, WalMetadata metadata = WalMetadata()) { wal_additions_.emplace_back(number, std::move(metadata));