From 54de56844d0a0bd6555fb0851a52ab0752987370 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 27 Jul 2018 16:00:26 -0700 Subject: [PATCH] Remove random writes from SST file ingestion (#4172) Summary: RocksDB used to store global_seqno in external SST files written by SstFileWriter. During file ingestion, RocksDB uses `pwrite` to update the `global_seqno`. Since random write is not supported in some non-POSIX compliant file systems, external SST file ingestion is not supported on these file systems. To address this limitation, we no longer update `global_seqno` during file ingestion. Later RocksDB uses the MANIFEST and other information in table properties to deduce global seqno for externally-ingested SST files. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4172 Differential Revision: D8961465 Pulled By: riversand963 fbshipit-source-id: 4382ec85270a96be5bc0cf33758ca2b167b05071 --- db/compaction_picker_test.cc | 4 +- db/compaction_picker_universal.cc | 18 +++--- db/db_impl.cc | 4 +- db/db_impl_compaction_flush.cc | 14 ++--- db/db_impl_experimental.cc | 2 +- db/db_impl_open.cc | 2 +- db/db_range_del_test.cc | 2 +- db/external_sst_file_ingestion_job.cc | 44 ++++++++------- db/external_sst_file_ingestion_job.h | 3 +- db/flush_job.cc | 2 +- db/flush_job_test.cc | 15 ++--- db/table_cache.cc | 2 +- db/version_builder.cc | 32 ++++++----- db/version_builder_test.cc | 4 +- db/version_edit.cc | 29 ++++++---- db/version_edit.h | 35 +++++++----- db/version_set.cc | 24 ++++---- include/rocksdb/options.h | 12 ++++ table/block_based_table_factory.cc | 2 +- table/block_based_table_reader.cc | 80 ++++++++++++++++++--------- table/block_based_table_reader.h | 1 + table/table_builder.h | 17 +++++- table/table_test.cc | 9 ++- tools/db_stress.cc | 4 +- 24 files changed, 222 insertions(+), 139 deletions(-) diff --git a/db/compaction_picker_test.cc b/db/compaction_picker_test.cc index 44bc5f0c2..40daf9cec 100644 --- a/db/compaction_picker_test.cc +++ b/db/compaction_picker_test.cc @@ -90,8 +90,8 @@ class CompactionPickerTest : public testing::Test { f->fd = FileDescriptor(file_number, path_id, file_size); f->smallest = InternalKey(smallest, smallest_seq, kTypeValue); f->largest = InternalKey(largest, largest_seq, kTypeValue); - f->smallest_seqno = smallest_seq; - f->largest_seqno = largest_seq; + f->fd.smallest_seqno = smallest_seq; + f->fd.largest_seqno = largest_seq; f->compensated_file_size = file_size; f->refs = 0; vstorage_->AddFile(level, f); diff --git a/db/compaction_picker_universal.cc b/db/compaction_picker_universal.cc index a371415cb..c0cf2b062 100644 --- a/db/compaction_picker_universal.cc +++ b/db/compaction_picker_universal.cc @@ -97,17 +97,17 @@ void GetSmallestLargestSeqno(const std::vector& files, SequenceNumber* largest_seqno) { bool is_first = true; for (FileMetaData* f : files) { - assert(f->smallest_seqno <= f->largest_seqno); + assert(f->fd.smallest_seqno <= f->fd.largest_seqno); if (is_first) { is_first = false; - *smallest_seqno = f->smallest_seqno; - *largest_seqno = f->largest_seqno; + *smallest_seqno = f->fd.smallest_seqno; + *largest_seqno = f->fd.largest_seqno; } else { - if (f->smallest_seqno < *smallest_seqno) { - *smallest_seqno = f->smallest_seqno; + if (f->fd.smallest_seqno < *smallest_seqno) { + *smallest_seqno = f->fd.smallest_seqno; } - if (f->largest_seqno > *largest_seqno) { - *largest_seqno = f->largest_seqno; + if (f->fd.largest_seqno > *largest_seqno) { + *largest_seqno = f->fd.largest_seqno; } } } @@ -365,11 +365,11 @@ Compaction* UniversalCompactionPicker::PickCompaction( size_t level_index = 0U; if (c->start_level() == 0) { for (auto f : *c->inputs(0)) { - assert(f->smallest_seqno <= f->largest_seqno); + assert(f->fd.smallest_seqno <= f->fd.largest_seqno); if (is_first) { is_first = false; } - prev_smallest_seqno = f->smallest_seqno; + prev_smallest_seqno = f->fd.smallest_seqno; } level_index = 1U; } diff --git a/db/db_impl.cc b/db/db_impl.cc index 0d6158a89..ba9fd46fe 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2932,8 +2932,8 @@ Status DBImpl::IngestExternalFile( } SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); - status = ingestion_job.Prepare(external_files, next_file_number, - super_version); + status = + ingestion_job.Prepare(external_files, next_file_number, super_version); CleanupSuperVersion(super_version); if (!status.ok()) { InstrumentedMutexLock l(&mutex_); diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index b71063818..18727e812 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -230,8 +230,8 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, info.job_id = job_id; info.triggered_writes_slowdown = triggered_writes_slowdown; info.triggered_writes_stop = triggered_writes_stop; - info.smallest_seqno = file_meta->smallest_seqno; - info.largest_seqno = file_meta->largest_seqno; + info.smallest_seqno = file_meta->fd.smallest_seqno; + info.largest_seqno = file_meta->fd.largest_seqno; info.table_properties = prop; info.flush_reason = cfd->GetFlushReason(); for (auto listener : immutable_db_options_.listeners) { @@ -281,8 +281,8 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, info.job_id = job_id; info.triggered_writes_slowdown = triggered_writes_slowdown; info.triggered_writes_stop = triggered_writes_stop; - info.smallest_seqno = file_meta->smallest_seqno; - info.largest_seqno = file_meta->largest_seqno; + info.smallest_seqno = file_meta->fd.smallest_seqno; + info.largest_seqno = file_meta->fd.largest_seqno; info.table_properties = prop; info.flush_reason = cfd->GetFlushReason(); for (auto listener : immutable_db_options_.listeners) { @@ -885,7 +885,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { edit.DeleteFile(level, f->fd.GetNumber()); edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, f->largest, - f->smallest_seqno, f->largest_seqno, + f->fd.smallest_seqno, f->fd.largest_seqno, f->marked_for_compaction); } ROCKS_LOG_DEBUG(immutable_db_options_.info_log, @@ -1804,8 +1804,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, c->edit()->DeleteFile(c->level(l), f->fd.GetNumber()); c->edit()->AddFile(c->output_level(), f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, - f->largest, f->smallest_seqno, f->largest_seqno, - f->marked_for_compaction); + f->largest, f->fd.smallest_seqno, + f->fd.largest_seqno, f->marked_for_compaction); ROCKS_LOG_BUFFER( log_buffer, diff --git a/db/db_impl_experimental.cc b/db/db_impl_experimental.cc index 961c284c9..def5755a3 100644 --- a/db/db_impl_experimental.cc +++ b/db/db_impl_experimental.cc @@ -131,7 +131,7 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) { edit.DeleteFile(0, f->fd.GetNumber()); edit.AddFile(target_level, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, f->largest, - f->smallest_seqno, f->largest_seqno, + f->fd.smallest_seqno, f->fd.largest_seqno, f->marked_for_compaction); } diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 27c01658c..6b639191c 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -969,7 +969,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, if (s.ok() && meta.fd.GetFileSize() > 0) { edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), meta.fd.GetFileSize(), meta.smallest, meta.largest, - meta.smallest_seqno, meta.largest_seqno, + meta.fd.smallest_seqno, meta.fd.largest_seqno, meta.marked_for_compaction); } diff --git a/db/db_range_del_test.cc b/db/db_range_del_test.cc index 1c01cdd41..9ca063335 100644 --- a/db/db_range_del_test.cc +++ b/db/db_range_del_test.cc @@ -191,7 +191,7 @@ TEST_F(DBRangeDelTest, SentinelsOmittedFromOutputFile) { std::vector> files; dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files); - ASSERT_GT(files[0][0].smallest_seqno, 0); + ASSERT_GT(files[0][0].fd.smallest_seqno, 0); db_->ReleaseSnapshot(snapshot); } diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index a3761e0d4..c31a5d6f5 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -30,8 +30,7 @@ namespace rocksdb { Status ExternalSstFileIngestionJob::Prepare( const std::vector& external_files_paths, - uint64_t next_file_number, - SuperVersion* sv) { + uint64_t next_file_number, SuperVersion* sv) { Status status; // Read the information of files we are ingesting @@ -477,9 +476,9 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( const SequenceNumber level_largest_seqno = (*max_element(level_files.begin(), level_files.end(), [](FileMetaData* f1, FileMetaData* f2) { - return f1->largest_seqno < f2->largest_seqno; + return f1->fd.largest_seqno < f2->fd.largest_seqno; })) - ->largest_seqno; + ->fd.largest_seqno; // should only assign seqno to current level's largest seqno when // the file fits if (level_largest_seqno != 0 && @@ -524,7 +523,7 @@ Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile( // at some upper level for (int lvl = 0; lvl < cfd_->NumberLevels() - 1; lvl++) { for (auto file : vstorage->LevelFiles(lvl)) { - if (file->smallest_seqno == 0) { + if (file->fd.smallest_seqno == 0) { return Status::InvalidArgument( "Can't ingest_behind file as despite allow_ingest_behind=true " "there are files with 0 seqno in database at upper levels!"); @@ -549,24 +548,27 @@ Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile( "field"); } - std::unique_ptr rwfile; - Status status = env_->NewRandomRWFile(file_to_ingest->internal_file_path, - &rwfile, env_options_); - if (!status.ok()) { - return status; + if (ingestion_options_.write_global_seqno) { + // Determine if we can write global_seqno to a given offset of file. + // If the file system does not support random write, then we should not. + // Otherwise we should. + std::unique_ptr rwfile; + Status status = env_->NewRandomRWFile(file_to_ingest->internal_file_path, + &rwfile, env_options_); + if (status.ok()) { + std::string seqno_val; + PutFixed64(&seqno_val, seqno); + status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val); + if (!status.ok()) { + return status; + } + } else if (!status.IsNotSupported()) { + return status; + } } - // Write the new seqno in the global sequence number field in the file - std::string seqno_val; - PutFixed64(&seqno_val, seqno); - status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val); - if (status.ok()) { - status = rwfile->Fsync(); - } - if (status.ok()) { - file_to_ingest->assigned_seqno = seqno; - } - return status; + file_to_ingest->assigned_seqno = seqno; + return Status::OK(); } bool ExternalSstFileIngestionJob::IngestedFileFitInLevel( diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index 29981bc10..0f14e3606 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -89,8 +89,7 @@ class ExternalSstFileIngestionJob { // Prepare the job by copying external files into the DB. Status Prepare(const std::vector& external_files_paths, - uint64_t next_file_number, - SuperVersion* sv); + uint64_t next_file_number, SuperVersion* sv); // Check if we need to flush the memtable before running the ingestion job // This will be true if the files we are ingesting are overlapping with any diff --git a/db/flush_job.cc b/db/flush_job.cc index 06132fb86..c7a299ece 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -389,7 +389,7 @@ Status FlushJob::WriteLevel0Table() { // Add file to L0 edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(), meta_.fd.GetFileSize(), meta_.smallest, meta_.largest, - meta_.smallest_seqno, meta_.largest_seqno, + meta_.fd.smallest_seqno, meta_.fd.largest_seqno, meta_.marked_for_compaction); } diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 5535fc46b..324037478 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -147,19 +147,20 @@ TEST_F(FlushJobTest, NonEmpty) { db_options_.statistics.get(), &event_logger, true); HistogramData hist; - FileMetaData fd; + FileMetaData file_meta; mutex_.Lock(); flush_job.PickMemTable(); - ASSERT_OK(flush_job.Run(nullptr, &fd)); + ASSERT_OK(flush_job.Run(nullptr, &file_meta)); mutex_.Unlock(); db_options_.statistics->histogramData(FLUSH_TIME, &hist); ASSERT_GT(hist.average, 0.0); - ASSERT_EQ(ToString(0), fd.smallest.user_key().ToString()); - ASSERT_EQ("9999a", - fd.largest.user_key().ToString()); // range tombstone end key - ASSERT_EQ(1, fd.smallest_seqno); - ASSERT_EQ(10000, fd.largest_seqno); // range tombstone seqnum 10000 + ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString()); + ASSERT_EQ( + "9999a", + file_meta.largest.user_key().ToString()); // range tombstone end key + ASSERT_EQ(1, file_meta.fd.smallest_seqno); + ASSERT_EQ(10000, file_meta.fd.largest_seqno); // range tombstone seqnum 10000 mock_table_factory_->AssertSingleFile(inserted_keys); job_context.Clean(); } diff --git a/db/table_cache.cc b/db/table_cache.cc index d8a95b969..edfffb0e8 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -120,7 +120,7 @@ Status TableCache::GetTableReader( s = ioptions_.table_factory->NewTableReader( TableReaderOptions(ioptions_, prefix_extractor, env_options, internal_comparator, skip_filters, immortal_tables_, - level), + level, fd.largest_seqno), std::move(file_reader), fd.GetFileSize(), table_reader, prefetch_index_and_filter_in_cache); TEST_SYNC_POINT("TableCache::GetTableReader:0"); diff --git a/db/version_builder.cc b/db/version_builder.cc index 9b034c5c8..d6ad0c55f 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -35,11 +35,11 @@ namespace rocksdb { bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) { - if (a->largest_seqno != b->largest_seqno) { - return a->largest_seqno > b->largest_seqno; + if (a->fd.largest_seqno != b->fd.largest_seqno) { + return a->fd.largest_seqno > b->fd.largest_seqno; } - if (a->smallest_seqno != b->smallest_seqno) { - return a->smallest_seqno > b->smallest_seqno; + if (a->fd.smallest_seqno != b->fd.smallest_seqno) { + return a->fd.smallest_seqno > b->fd.smallest_seqno; } // Break ties by file number return a->fd.GetNumber() > b->fd.GetNumber(); @@ -162,22 +162,24 @@ class VersionBuilder::Rep { abort(); } - if (f2->smallest_seqno == f2->largest_seqno) { + if (f2->fd.smallest_seqno == f2->fd.largest_seqno) { // This is an external file that we ingested - SequenceNumber external_file_seqno = f2->smallest_seqno; - if (!(external_file_seqno < f1->largest_seqno || + SequenceNumber external_file_seqno = f2->fd.smallest_seqno; + if (!(external_file_seqno < f1->fd.largest_seqno || external_file_seqno == 0)) { - fprintf(stderr, "L0 file with seqno %" PRIu64 " %" PRIu64 - " vs. file with global_seqno %" PRIu64 "\n", - f1->smallest_seqno, f1->largest_seqno, + fprintf(stderr, + "L0 file with seqno %" PRIu64 " %" PRIu64 + " vs. file with global_seqno %" PRIu64 "\n", + f1->fd.smallest_seqno, f1->fd.largest_seqno, external_file_seqno); abort(); } - } else if (f1->smallest_seqno <= f2->smallest_seqno) { - fprintf(stderr, "L0 files seqno %" PRIu64 " %" PRIu64 - " vs. %" PRIu64 " %" PRIu64 "\n", - f1->smallest_seqno, f1->largest_seqno, f2->smallest_seqno, - f2->largest_seqno); + } else if (f1->fd.smallest_seqno <= f2->fd.smallest_seqno) { + fprintf(stderr, + "L0 files seqno %" PRIu64 " %" PRIu64 " vs. %" PRIu64 + " %" PRIu64 "\n", + f1->fd.smallest_seqno, f1->fd.largest_seqno, + f2->fd.smallest_seqno, f2->fd.largest_seqno); abort(); } } else { diff --git a/db/version_builder_test.cc b/db/version_builder_test.cc index 304df2a04..0459cf777 100644 --- a/db/version_builder_test.cc +++ b/db/version_builder_test.cc @@ -63,8 +63,8 @@ class VersionBuilderTest : public testing::Test { f->fd = FileDescriptor(file_number, path_id, file_size); f->smallest = GetInternalKey(smallest, smallest_seq); f->largest = GetInternalKey(largest, largest_seq); - f->smallest_seqno = smallest_seqno; - f->largest_seqno = largest_seqno; + f->fd.smallest_seqno = smallest_seqno; + f->fd.largest_seqno = largest_seqno; f->compensated_file_size = file_size; f->refs = 0; f->num_entries = num_entries; diff --git a/db/version_edit.cc b/db/version_edit.cc index ad916036a..40d25999e 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -135,7 +135,7 @@ bool VersionEdit::EncodeTo(std::string* dst) const { PutVarint64(dst, f.fd.GetFileSize()); PutLengthPrefixedSlice(dst, f.smallest.Encode()); PutLengthPrefixedSlice(dst, f.largest.Encode()); - PutVarint64Varint64(dst, f.smallest_seqno, f.largest_seqno); + PutVarint64Varint64(dst, f.fd.smallest_seqno, f.fd.largest_seqno); if (has_customized_fields) { // Customized fields' format: // +-----------------------------+ @@ -233,14 +233,16 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { uint64_t number; uint32_t path_id = 0; uint64_t file_size; + SequenceNumber smallest_seqno; + SequenceNumber largest_seqno; // Since this is the only forward-compatible part of the code, we hack new // extension into this record. When we do, we set this boolean to distinguish // the record from the normal NewFile records. if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) && GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) && GetInternalKey(input, &f.largest) && - GetVarint64(input, &f.smallest_seqno) && - GetVarint64(input, &f.largest_seqno)) { + GetVarint64(input, &smallest_seqno) && + GetVarint64(input, &largest_seqno)) { // See comments in VersionEdit::EncodeTo() for format of customized fields while (true) { uint32_t custom_tag; @@ -289,7 +291,8 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { } else { return "new-file4 entry"; } - f.fd = FileDescriptor(number, path_id, file_size); + f.fd = + FileDescriptor(number, path_id, file_size, smallest_seqno, largest_seqno); new_files_.push_back(std::make_pair(level, f)); return nullptr; } @@ -409,13 +412,16 @@ Status VersionEdit::DecodeFrom(const Slice& src) { case kNewFile2: { uint64_t number; uint64_t file_size; + SequenceNumber smallest_seqno; + SequenceNumber largest_seqno; if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number) && GetVarint64(&input, &file_size) && GetInternalKey(&input, &f.smallest) && GetInternalKey(&input, &f.largest) && - GetVarint64(&input, &f.smallest_seqno) && - GetVarint64(&input, &f.largest_seqno)) { - f.fd = FileDescriptor(number, 0, file_size); + GetVarint64(&input, &smallest_seqno) && + GetVarint64(&input, &largest_seqno)) { + f.fd = FileDescriptor(number, 0, file_size, smallest_seqno, + largest_seqno); new_files_.push_back(std::make_pair(level, f)); } else { if (!msg) { @@ -429,13 +435,16 @@ Status VersionEdit::DecodeFrom(const Slice& src) { uint64_t number; uint32_t path_id; uint64_t file_size; + SequenceNumber smallest_seqno; + SequenceNumber largest_seqno; if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number) && GetVarint32(&input, &path_id) && GetVarint64(&input, &file_size) && GetInternalKey(&input, &f.smallest) && GetInternalKey(&input, &f.largest) && - GetVarint64(&input, &f.smallest_seqno) && - GetVarint64(&input, &f.largest_seqno)) { - f.fd = FileDescriptor(number, path_id, file_size); + GetVarint64(&input, &smallest_seqno) && + GetVarint64(&input, &largest_seqno)) { + f.fd = FileDescriptor(number, path_id, file_size, smallest_seqno, + largest_seqno); new_files_.push_back(std::make_pair(level, f)); } else { if (!msg) { diff --git a/db/version_edit.h b/db/version_edit.h index 5728827a2..28028d1e4 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -36,18 +36,28 @@ struct FileDescriptor { TableReader* table_reader; uint64_t packed_number_and_path_id; uint64_t file_size; // File size in bytes + SequenceNumber smallest_seqno; // The smallest seqno in this file + SequenceNumber largest_seqno; // The largest seqno in this file FileDescriptor() : FileDescriptor(0, 0, 0) {} FileDescriptor(uint64_t number, uint32_t path_id, uint64_t _file_size) + : FileDescriptor(number, path_id, _file_size, kMaxSequenceNumber, 0) {} + + FileDescriptor(uint64_t number, uint32_t path_id, uint64_t _file_size, + SequenceNumber _smallest_seqno, SequenceNumber _largest_seqno) : table_reader(nullptr), packed_number_and_path_id(PackFileNumberAndPathId(number, path_id)), - file_size(_file_size) {} + file_size(_file_size), + smallest_seqno(_smallest_seqno), + largest_seqno(_largest_seqno) {} FileDescriptor& operator=(const FileDescriptor& fd) { table_reader = fd.table_reader; packed_number_and_path_id = fd.packed_number_and_path_id; file_size = fd.file_size; + smallest_seqno = fd.smallest_seqno; + largest_seqno = fd.largest_seqno; return *this; } @@ -77,8 +87,6 @@ struct FileMetaData { FileDescriptor fd; InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table - SequenceNumber smallest_seqno; // The smallest seqno in this file - SequenceNumber largest_seqno; // The largest seqno in this file // Needs to be disposed when refs becomes 0. Cache::Handle* table_reader_handle; @@ -108,9 +116,7 @@ struct FileMetaData { // file. FileMetaData() - : smallest_seqno(kMaxSequenceNumber), - largest_seqno(0), - table_reader_handle(nullptr), + : table_reader_handle(nullptr), compensated_file_size(0), num_entries(0), num_deletions(0), @@ -128,8 +134,8 @@ struct FileMetaData { smallest.DecodeFrom(key); } largest.DecodeFrom(key); - smallest_seqno = std::min(smallest_seqno, seqno); - largest_seqno = std::max(largest_seqno, seqno); + fd.smallest_seqno = std::min(fd.smallest_seqno, seqno); + fd.largest_seqno = std::max(fd.largest_seqno, seqno); } // Unlike UpdateBoundaries, ranges do not need to be presented in any @@ -143,8 +149,8 @@ struct FileMetaData { if (largest.size() == 0 || icmp.Compare(largest, end) < 0) { largest = end; } - smallest_seqno = std::min(smallest_seqno, seqno); - largest_seqno = std::max(largest_seqno, seqno); + fd.smallest_seqno = std::min(fd.smallest_seqno, seqno); + fd.largest_seqno = std::max(fd.largest_seqno, seqno); } }; @@ -233,17 +239,18 @@ class VersionEdit { bool marked_for_compaction) { assert(smallest_seqno <= largest_seqno); FileMetaData f; - f.fd = FileDescriptor(file, file_path_id, file_size); + f.fd = FileDescriptor(file, file_path_id, file_size, smallest_seqno, + largest_seqno); f.smallest = smallest; f.largest = largest; - f.smallest_seqno = smallest_seqno; - f.largest_seqno = largest_seqno; + f.fd.smallest_seqno = smallest_seqno; + f.fd.largest_seqno = largest_seqno; f.marked_for_compaction = marked_for_compaction; new_files_.emplace_back(level, std::move(f)); } void AddFile(int level, const FileMetaData& f) { - assert(f.smallest_seqno <= f.largest_seqno); + assert(f.fd.smallest_seqno <= f.fd.largest_seqno); new_files_.emplace_back(level, f); } diff --git a/db/version_set.cc b/db/version_set.cc index 932b9d598..d6c83c45d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -897,8 +897,8 @@ void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) { } files.emplace_back( MakeTableFileName("", file->fd.GetNumber()), file_path, - file->fd.GetFileSize(), file->smallest_seqno, file->largest_seqno, - file->smallest.user_key().ToString(), + file->fd.GetFileSize(), file->fd.smallest_seqno, + file->fd.largest_seqno, file->smallest.user_key().ToString(), file->largest.user_key().ToString(), file->stats.num_reads_sampled.load(std::memory_order_relaxed), file->being_compacted); @@ -1891,13 +1891,15 @@ void VersionStorageInfo::UpdateFilesByCompactionPri( case kOldestLargestSeqFirst: std::sort(temp.begin(), temp.end(), [](const Fsize& f1, const Fsize& f2) -> bool { - return f1.file->largest_seqno < f2.file->largest_seqno; + return f1.file->fd.largest_seqno < + f2.file->fd.largest_seqno; }); break; case kOldestSmallestSeqFirst: std::sort(temp.begin(), temp.end(), [](const Fsize& f1, const Fsize& f2) -> bool { - return f1.file->smallest_seqno < f2.file->smallest_seqno; + return f1.file->fd.smallest_seqno < + f2.file->fd.smallest_seqno; }); break; case kMinOverlappingRatio: @@ -1981,17 +1983,17 @@ void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() { bottommost_files_mark_threshold_ = kMaxSequenceNumber; for (auto& level_and_file : bottommost_files_) { if (!level_and_file.second->being_compacted && - level_and_file.second->largest_seqno != 0 && + level_and_file.second->fd.largest_seqno != 0 && level_and_file.second->num_deletions > 1) { // largest_seqno might be nonzero due to containing the final key in an // earlier compaction, whose seqnum we didn't zero out. Multiple deletions // ensures the file really contains deleted or overwritten keys. - if (level_and_file.second->largest_seqno < oldest_snapshot_seqnum_) { + if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) { bottommost_files_marked_for_compaction_.push_back(level_and_file); } else { bottommost_files_mark_threshold_ = std::min(bottommost_files_mark_threshold_, - level_and_file.second->largest_seqno); + level_and_file.second->fd.largest_seqno); } } } @@ -2417,7 +2419,7 @@ const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch, AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt)); int ret = snprintf(scratch->buffer + len, sz, "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ", - f->fd.GetNumber(), f->smallest_seqno, sztxt, + f->fd.GetNumber(), f->fd.smallest_seqno, sztxt, static_cast(f->being_compacted)); if (ret < 0 || ret >= sz) break; @@ -3963,7 +3965,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { cfd->current()->storage_info()->LevelFiles(level)) { edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, f->largest, - f->smallest_seqno, f->largest_seqno, + f->fd.smallest_seqno, f->fd.largest_seqno, f->marked_for_compaction); } } @@ -4288,8 +4290,8 @@ void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { filemetadata.size = file->fd.GetFileSize(); filemetadata.smallestkey = file->smallest.user_key().ToString(); filemetadata.largestkey = file->largest.user_key().ToString(); - filemetadata.smallest_seqno = file->smallest_seqno; - filemetadata.largest_seqno = file->largest_seqno; + filemetadata.smallest_seqno = file->fd.smallest_seqno; + filemetadata.largest_seqno = file->fd.largest_seqno; metadata->push_back(filemetadata); } } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index f2966ef79..0a07c3ced 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1267,6 +1267,18 @@ struct IngestExternalFileOptions { // with allow_ingest_behind=true since the dawn of time. // All files will be ingested at the bottommost level with seqno=0. bool ingest_behind = false; + // Set to true if you would like to write global_seqno to a given offset in + // the external SST file for backward compatibility. Older versions of + // RocksDB writes a global_seqno to a given offset within ingested SST files, + // and new versions of RocksDB do not. If you ingest an external SST using + // new version of RocksDB and would like to be able to downgrade to an + // older version of RocksDB, you should set 'write_global_seqno' to true. If + // your service is just starting to use the new RocksDB, we recommend that + // you set this option to false, which brings two benefits: + // 1. No extra random write for global_seqno during ingestion. + // 2. Without writing external SST file, it's possible to do checksum. + // We have a plan to set this option to false by default in the future. + bool write_global_seqno = true; }; } // namespace rocksdb diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index e56dcf644..ad5838315 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -203,7 +203,7 @@ Status BlockBasedTableFactory::NewTableReader( file_size, table_reader, table_reader_options.prefix_extractor, prefetch_index_and_filter_in_cache, table_reader_options.skip_filters, table_reader_options.level, table_reader_options.immortal, - &tail_prefetch_stats_); + table_reader_options.largest_seqno, &tail_prefetch_stats_); } TableBuilder* BlockBasedTableFactory::NewTableBuilder( diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index e2d92529b..cd292a3fe 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -9,6 +9,7 @@ #include "table/block_based_table_reader.h" #include +#include #include #include #include @@ -663,51 +664,71 @@ bool IsFeatureSupported(const TableProperties& table_properties, return true; } -SequenceNumber GetGlobalSequenceNumber(const TableProperties& table_properties, - Logger* info_log) { - auto& props = table_properties.user_collected_properties; - - auto version_pos = props.find(ExternalSstFilePropertyNames::kVersion); - auto seqno_pos = props.find(ExternalSstFilePropertyNames::kGlobalSeqno); +// Caller has to ensure seqno is not nullptr. +Status GetGlobalSequenceNumber(const TableProperties& table_properties, + SequenceNumber largest_seqno, + SequenceNumber* seqno) { + const auto& props = table_properties.user_collected_properties; + const auto version_pos = props.find(ExternalSstFilePropertyNames::kVersion); + const auto seqno_pos = props.find(ExternalSstFilePropertyNames::kGlobalSeqno); + *seqno = kDisableGlobalSequenceNumber; if (version_pos == props.end()) { if (seqno_pos != props.end()) { + std::array msg_buf; // This is not an external sst file, global_seqno is not supported. - assert(false); - ROCKS_LOG_ERROR( - info_log, + snprintf( + msg_buf.data(), msg_buf.max_size(), "A non-external sst file have global seqno property with value %s", seqno_pos->second.c_str()); + return Status::Corruption(msg_buf.data()); } - return kDisableGlobalSequenceNumber; + return Status::OK(); } uint32_t version = DecodeFixed32(version_pos->second.c_str()); if (version < 2) { if (seqno_pos != props.end() || version != 1) { + std::array msg_buf; // This is a v1 external sst file, global_seqno is not supported. - assert(false); - ROCKS_LOG_ERROR( - info_log, - "An external sst file with version %u have global seqno property " - "with value %s", - version, seqno_pos->second.c_str()); + snprintf(msg_buf.data(), msg_buf.max_size(), + "An external sst file with version %u have global seqno " + "property with value %s", + version, seqno_pos->second.c_str()); + return Status::Corruption(msg_buf.data()); } - return kDisableGlobalSequenceNumber; + return Status::OK(); } - SequenceNumber global_seqno = DecodeFixed64(seqno_pos->second.c_str()); + // Since we have a plan to deprecate global_seqno, we do not return failure + // if seqno_pos == props.end(). We rely on version_pos to detect whether the + // SST is external. + SequenceNumber global_seqno(0); + if (seqno_pos != props.end()) { + global_seqno = DecodeFixed64(seqno_pos->second.c_str()); + } + if (global_seqno != 0 && global_seqno != largest_seqno) { + std::array msg_buf; + snprintf(msg_buf.data(), msg_buf.max_size(), + "An external sst file with version %u have global seqno property " + "with value %s, while largest seqno in the file is %llu", + version, seqno_pos->second.c_str(), + static_cast(largest_seqno)); + return Status::Corruption(msg_buf.data()); + } + global_seqno = largest_seqno; + *seqno = largest_seqno; if (global_seqno > kMaxSequenceNumber) { - assert(false); - ROCKS_LOG_ERROR( - info_log, - "An external sst file with version %u have global seqno property " - "with value %llu, which is greater than kMaxSequenceNumber", - version, global_seqno); + std::array msg_buf; + snprintf(msg_buf.data(), msg_buf.max_size(), + "An external sst file with version %u have global seqno property " + "with value %llu, which is greater than kMaxSequenceNumber", + version, static_cast(global_seqno)); + return Status::Corruption(msg_buf.data()); } - return global_seqno; + return Status::OK(); } } // namespace @@ -734,6 +755,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, const bool prefetch_index_and_filter_in_cache, const bool skip_filters, const int level, const bool immortal_table, + const SequenceNumber largest_seqno, TailPrefetchStats* tail_prefetch_stats) { table_reader->reset(); @@ -936,8 +958,12 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, *(rep->table_properties), BlockBasedTablePropertyNames::kPrefixFiltering, rep->ioptions.info_log); - rep->global_seqno = GetGlobalSequenceNumber(*(rep->table_properties), - rep->ioptions.info_log); + s = GetGlobalSequenceNumber(*(rep->table_properties), largest_seqno, + &(rep->global_seqno)); + if (!s.ok()) { + ROCKS_LOG_ERROR(rep->ioptions.info_log, "%s", s.ToString().c_str()); + return s; + } } // Read the range del meta block diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 98a81e986..7e7b41a71 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -95,6 +95,7 @@ class BlockBasedTable : public TableReader { bool prefetch_index_and_filter_in_cache = true, bool skip_filters = false, int level = -1, const bool immortal_table = false, + const SequenceNumber largest_seqno = 0, TailPrefetchStats* tail_prefetch_stats = nullptr); bool PrefixMayMatch(const Slice& internal_key, diff --git a/table/table_builder.h b/table/table_builder.h index fdf933915..0665fac82 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -13,6 +13,7 @@ #include #include #include +#include "db/dbformat.h" #include "db/table_properties_collector.h" #include "options/cf_options.h" #include "rocksdb/options.h" @@ -32,13 +33,25 @@ struct TableReaderOptions { const InternalKeyComparator& _internal_comparator, bool _skip_filters = false, bool _immortal = false, int _level = -1) + : TableReaderOptions(_ioptions, _prefix_extractor, _env_options, + _internal_comparator, _skip_filters, _immortal, + _level, 0 /* _largest_seqno */) {} + + // @param skip_filters Disables loading/accessing the filter block + TableReaderOptions(const ImmutableCFOptions& _ioptions, + const SliceTransform* _prefix_extractor, + const EnvOptions& _env_options, + const InternalKeyComparator& _internal_comparator, + bool _skip_filters, bool _immortal, int _level, + SequenceNumber _largest_seqno) : ioptions(_ioptions), prefix_extractor(_prefix_extractor), env_options(_env_options), internal_comparator(_internal_comparator), skip_filters(_skip_filters), immortal(_immortal), - level(_level) {} + level(_level), + largest_seqno(_largest_seqno) {} const ImmutableCFOptions& ioptions; const SliceTransform* prefix_extractor; @@ -50,6 +63,8 @@ struct TableReaderOptions { bool immortal; // what level this table/file is on, -1 for "not set, don't know" int level; + // largest seqno in the table + SequenceNumber largest_seqno; }; struct TableBuilderOptions { diff --git a/table/table_test.cc b/table/table_test.cc index 080e44f00..9ce98ab47 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -3132,7 +3132,14 @@ TEST_F(PrefixTest, PrefixAndWholeKeyTest) { // rocksdb still works. } -TEST_P(BlockBasedTableTest, TableWithGlobalSeqno) { +/* + * Disable TableWithGlobalSeqno since RocksDB does not store global_seqno in + * the SST file any more. Instead, RocksDB deduces global_seqno from the + * MANIFEST while reading from an SST. Therefore, it's not possible to test the + * functionality of global_seqno in a single, isolated unit test without the + * involvement of Version, VersionSet, etc. + */ +TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) { BlockBasedTableOptions bbto = GetBlockBasedTableOptions(); test::StringSink* sink = new test::StringSink(); unique_ptr file_writer(test::GetWritableFileWriter(sink)); diff --git a/tools/db_stress.cc b/tools/db_stress.cc index e599df93e..ce16ee824 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -1634,8 +1634,8 @@ class StressTest { snprintf(buf, 4, "%X", value[i]); tmp.append(buf); } - fprintf(stdout, "[CF %d] %" PRIi64 " == > (%" ROCKSDB_PRIszt ") %s\n", - cf, key, sz, tmp.c_str()); + fprintf(stdout, "[CF %d] %" PRIi64 " == > (%" ROCKSDB_PRIszt ") %s\n", cf, + key, sz, tmp.c_str()); } static int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) {