From 7eddecce125af3e4ff1fd55d66e5760cb12b6bb7 Mon Sep 17 00:00:00 2001 From: Aaron Gao Date: Wed, 26 Apr 2017 13:28:39 -0700 Subject: [PATCH] support bulk loading with universal compaction Summary: Support buck load with universal compaction. More test cases to be added. Closes https://github.com/facebook/rocksdb/pull/2202 Differential Revision: D4935360 Pulled By: lightmark fbshipit-source-id: cc3ca1b6f42faa503207dab1408d6bcf393ee5b5 --- db/compaction_picker_universal.cc | 2 - db/db_impl.cc | 3 +- db/external_sst_file_basic_test.cc | 174 +++++++++++++------------- db/external_sst_file_ingestion_job.cc | 68 +++++++--- db/external_sst_file_ingestion_job.h | 12 +- db/external_sst_file_test.cc | 61 ++++++--- 6 files changed, 191 insertions(+), 129 deletions(-) diff --git a/db/compaction_picker_universal.cc b/db/compaction_picker_universal.cc index 0c61efe86..a14ea6473 100644 --- a/db/compaction_picker_universal.cc +++ b/db/compaction_picker_universal.cc @@ -342,8 +342,6 @@ Compaction* UniversalCompactionPicker::PickCompaction( assert(f->smallest_seqno <= f->largest_seqno); if (is_first) { is_first = false; - } else { - assert(prev_smallest_seqno > f->largest_seqno); } prev_smallest_seqno = f->smallest_seqno; } diff --git a/db/db_impl.cc b/db/db_impl.cc index fd934bc3f..19046f3fd 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2532,7 +2532,8 @@ Status DBImpl::IngestExternalFile( if (status.ok()) { bool need_flush = false; status = ingestion_job.NeedsFlush(&need_flush); - + TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush", + &need_flush); if (status.ok() && need_flush) { mutex_.Unlock(); status = FlushMemTable(cfd, FlushOptions(), true /* writes_stopped */); diff --git a/db/external_sst_file_basic_test.cc b/db/external_sst_file_basic_test.cc index 9c6e08e7a..66a0220be 100644 --- a/db/external_sst_file_basic_test.cc +++ b/db/external_sst_file_basic_test.cc @@ -182,92 +182,94 @@ TEST_F(ExternalSSTFileBasicTest, NoCopy) { } TEST_F(ExternalSSTFileBasicTest, IngestFileWithGlobalSeqnoPickedSeqno) { - Options options = CurrentOptions(); - DestroyAndReopen(options); - std::map true_data; - - int file_id = 1; - - ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2, 3, 4, 5, 6}, file_id++, - &true_data)); - // File dont overwrite any keys, No seqno needed - ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0); - - ASSERT_OK(GenerateAndAddExternalFile(options, {10, 11, 12, 13}, file_id++, - &true_data)); - // File dont overwrite any keys, No seqno needed - ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0); - - ASSERT_OK( - GenerateAndAddExternalFile(options, {1, 4, 6}, file_id++, &true_data)); - // File overwrite some keys, a seqno will be assigned - ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1); - - ASSERT_OK( - GenerateAndAddExternalFile(options, {11, 15, 19}, file_id++, &true_data)); - // File overwrite some keys, a seqno will be assigned - ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2); - - ASSERT_OK( - GenerateAndAddExternalFile(options, {120, 130}, file_id++, &true_data)); - // File dont overwrite any keys, No seqno needed - ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2); - - ASSERT_OK( - GenerateAndAddExternalFile(options, {1, 130}, file_id++, &true_data)); - // File overwrite some keys, a seqno will be assigned - ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3); - - // Write some keys through normal write path - for (int i = 0; i < 50; i++) { - ASSERT_OK(Put(Key(i), "memtable")); - true_data[Key(i)] = "memtable"; - } - SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber(); - - ASSERT_OK( - GenerateAndAddExternalFile(options, {60, 61, 62}, file_id++, &true_data)); - // File dont overwrite any keys, No seqno needed - ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno); - - ASSERT_OK( - GenerateAndAddExternalFile(options, {40, 41, 42}, file_id++, &true_data)); - // File overwrite some keys, a seqno will be assigned - ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1); - - ASSERT_OK( - GenerateAndAddExternalFile(options, {20, 30, 40}, file_id++, &true_data)); - // File overwrite some keys, a seqno will be assigned - ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2); - - const Snapshot* snapshot = db_->GetSnapshot(); - - // We will need a seqno for the file regardless if the file overwrite - // keys in the DB or not because we have a snapshot - ASSERT_OK( - GenerateAndAddExternalFile(options, {1000, 1002}, file_id++, &true_data)); - // A global seqno will be assigned anyway because of the snapshot - ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 3); - - ASSERT_OK( - GenerateAndAddExternalFile(options, {2000, 3002}, file_id++, &true_data)); - // A global seqno will be assigned anyway because of the snapshot - ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 4); - - ASSERT_OK(GenerateAndAddExternalFile(options, {1, 20, 40, 100, 150}, - file_id++, &true_data)); - // A global seqno will be assigned anyway because of the snapshot - ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5); - - db_->ReleaseSnapshot(snapshot); - - ASSERT_OK( - GenerateAndAddExternalFile(options, {5000, 5001}, file_id++, &true_data)); - // No snapshot anymore, no need to assign a seqno - ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5); - - size_t kcnt = 0; - VerifyDBFromMap(true_data, &kcnt, false); + do { + Options options = CurrentOptions(); + DestroyAndReopen(options); + std::map true_data; + + int file_id = 1; + + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2, 3, 4, 5, 6}, file_id++, + &true_data)); + // File dont overwrite any keys, No seqno needed + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0); + + ASSERT_OK(GenerateAndAddExternalFile(options, {10, 11, 12, 13}, file_id++, + &true_data)); + // File dont overwrite any keys, No seqno needed + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0); + + ASSERT_OK( + GenerateAndAddExternalFile(options, {1, 4, 6}, file_id++, &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1); + + ASSERT_OK(GenerateAndAddExternalFile(options, {11, 15, 19}, file_id++, + &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2); + + ASSERT_OK( + GenerateAndAddExternalFile(options, {120, 130}, file_id++, &true_data)); + // File dont overwrite any keys, No seqno needed + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2); + + ASSERT_OK( + GenerateAndAddExternalFile(options, {1, 130}, file_id++, &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3); + + // Write some keys through normal write path + for (int i = 0; i < 50; i++) { + ASSERT_OK(Put(Key(i), "memtable")); + true_data[Key(i)] = "memtable"; + } + SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber(); + + ASSERT_OK(GenerateAndAddExternalFile(options, {60, 61, 62}, file_id++, + &true_data)); + // File dont overwrite any keys, No seqno needed + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno); + + ASSERT_OK(GenerateAndAddExternalFile(options, {40, 41, 42}, file_id++, + &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1); + + ASSERT_OK(GenerateAndAddExternalFile(options, {20, 30, 40}, file_id++, + &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2); + + const Snapshot* snapshot = db_->GetSnapshot(); + + // We will need a seqno for the file regardless if the file overwrite + // keys in the DB or not because we have a snapshot + ASSERT_OK(GenerateAndAddExternalFile(options, {1000, 1002}, file_id++, + &true_data)); + // A global seqno will be assigned anyway because of the snapshot + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 3); + + ASSERT_OK(GenerateAndAddExternalFile(options, {2000, 3002}, file_id++, + &true_data)); + // A global seqno will be assigned anyway because of the snapshot + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 4); + + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 20, 40, 100, 150}, + file_id++, &true_data)); + // A global seqno will be assigned anyway because of the snapshot + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5); + + db_->ReleaseSnapshot(snapshot); + + ASSERT_OK(GenerateAndAddExternalFile(options, {5000, 5001}, file_id++, + &true_data)); + // No snapshot anymore, no need to assign a seqno + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5); + + size_t kcnt = 0; + VerifyDBFromMap(true_data, &kcnt, false); + } while (ChangeCompactOptions()); } TEST_F(ExternalSSTFileBasicTest, FadviseTrigger) { diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 28ccc7dc7..9a6b3b29d 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -156,34 +156,34 @@ Status ExternalSstFileIngestionJob::Run() { bool consumed_seqno = false; bool force_global_seqno = false; - const SequenceNumber last_seqno = versions_->LastSequence(); + if (ingestion_options_.snapshot_consistency && !db_snapshots_->empty()) { // We need to assign a global sequence number to all the files even // if the dont overlap with any ranges since we have snapshots force_global_seqno = true; } - + const SequenceNumber last_seqno = versions_->LastSequence(); SuperVersion* super_version = cfd_->GetSuperVersion(); edit_.SetColumnFamily(cfd_->GetID()); // The levels that the files will be ingested into + for (IngestedFileInfo& f : files_to_ingest_) { - bool overlap_with_db = false; - status = AssignLevelForIngestedFile(super_version, &f, &overlap_with_db); + SequenceNumber assigned_seqno = 0; + status = AssignLevelAndSeqnoForIngestedFile( + super_version, force_global_seqno, cfd_->ioptions()->compaction_style, + &f, &assigned_seqno); if (!status.ok()) { return status; } - - if (overlap_with_db || force_global_seqno) { - status = AssignGlobalSeqnoForIngestedFile(&f, last_seqno + 1); + status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno); + TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run", + &assigned_seqno); + if (assigned_seqno == last_seqno + 1) { consumed_seqno = true; - } else { - status = AssignGlobalSeqnoForIngestedFile(&f, 0); } - if (!status.ok()) { return status; } - edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(), f.fd.GetFileSize(), f.smallest_internal_key(), f.largest_internal_key(), f.assigned_seqno, f.assigned_seqno, @@ -388,15 +388,25 @@ Status ExternalSstFileIngestionJob::IngestedFilesOverlapWithMemtables( return status; } -Status ExternalSstFileIngestionJob::AssignLevelForIngestedFile( - SuperVersion* sv, IngestedFileInfo* file_to_ingest, bool* overlap_with_db) { - *overlap_with_db = false; +Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( + SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style, + IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno) { + Status status; + *assigned_seqno = 0; + const SequenceNumber last_seqno = versions_->LastSequence(); + if (force_global_seqno) { + *assigned_seqno = last_seqno + 1; + if (compaction_style == kCompactionStyleUniversal) { + file_to_ingest->picked_level = 0; + return status; + } + } + bool overlap_with_db = false; Arena arena; ReadOptions ro; ro.total_order_seek = true; - Status status; int target_level = 0; auto* vstorage = cfd_->current()->storage_info(); for (int lvl = 0; lvl < cfd_->NumberLevels(); lvl++) { @@ -423,24 +433,46 @@ Status ExternalSstFileIngestionJob::AssignLevelForIngestedFile( if (!status.ok()) { return status; } - if (overlap_with_level) { // We must use L0 or any level higher than `lvl` to be able to overwrite // the keys that we overlap with in this level, We also need to assign // this file a seqno to overwrite the existing keys in level `lvl` - *overlap_with_db = true; + overlap_with_db = true; break; } + + if (compaction_style == kCompactionStyleUniversal && lvl != 0) { + const std::vector& level_files = + vstorage->LevelFiles(lvl); + const SequenceNumber level_largest_seqno = + (*max_element(level_files.begin(), level_files.end(), + [](FileMetaData* f1, FileMetaData* f2) { + return f1->largest_seqno < f2->largest_seqno; + })) + ->largest_seqno; + if (level_largest_seqno != 0) { + *assigned_seqno = level_largest_seqno; + } else { + continue; + } + } + } else if (compaction_style == kCompactionStyleUniversal) { + continue; } // We dont overlap with any keys in this level, but we still need to check // if our file can fit in it - if (IngestedFileFitInLevel(file_to_ingest, lvl)) { target_level = lvl; } } + TEST_SYNC_POINT_CALLBACK( + "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile", + &overlap_with_db); file_to_ingest->picked_level = target_level; + if (overlap_with_db && *assigned_seqno == 0) { + *assigned_seqno = last_seqno + 1; + } return status; } diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index 9efa19e9d..20892bed5 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -114,12 +114,14 @@ class ExternalSstFileIngestionJob { // REQUIRES: Mutex held Status IngestedFilesOverlapWithMemtables(SuperVersion* sv, bool* overlap); - // Assign `file_to_ingest` the lowest possible level that it can - // be ingested to. + // Assign `file_to_ingest` the appropriate sequence number and the lowest + // possible level that it can be ingested to according to compaction_style. // REQUIRES: Mutex held - Status AssignLevelForIngestedFile(SuperVersion* sv, - IngestedFileInfo* file_to_ingest, - bool* overlap_with_db); + Status AssignLevelAndSeqnoForIngestedFile(SuperVersion* sv, + bool force_global_seqno, + CompactionStyle compaction_style, + IngestedFileInfo* file_to_ingest, + SequenceNumber* assigned_seqno); // Set the file global sequence number to `seqno` Status AssignGlobalSeqnoForIngestedFile(IngestedFileInfo* file_to_ingest, diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index 4916c04d3..e88b62e64 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -315,8 +315,7 @@ TEST_F(ExternalSSTFileTest, Basic) { ASSERT_EQ(Get(Key(k)), value); } DestroyAndRecreateExternalSSTFilesDir(); - } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction | - kSkipFIFOCompaction)); + } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction)); } class SstFileWriterCollector : public TablePropertiesCollector { public: @@ -556,8 +555,7 @@ TEST_F(ExternalSSTFileTest, AddList) { ASSERT_EQ(Get(Key(k)), value); } DestroyAndRecreateExternalSSTFilesDir(); - } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction | - kSkipFIFOCompaction)); + } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction)); } TEST_F(ExternalSSTFileTest, AddListAtomicity) { @@ -599,8 +597,7 @@ TEST_F(ExternalSSTFileTest, AddListAtomicity) { ASSERT_EQ(Get(Key(k)), value); } DestroyAndRecreateExternalSSTFilesDir(); - } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction | - kSkipFIFOCompaction)); + } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction)); } // This test reporduce a bug that can happen in some cases if the DB started // purging obsolete files when we are adding an external sst file. @@ -831,12 +828,31 @@ TEST_F(ExternalSSTFileTest, MultiThreaded) { fprintf(stderr, "Verified %d values\n", num_files * keys_per_file); DestroyAndRecreateExternalSSTFilesDir(); - } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction | - kSkipFIFOCompaction)); + } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction)); } TEST_F(ExternalSSTFileTest, OverlappingRanges) { Random rnd(301); + int picked_level = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "ExternalSstFileIngestionJob::Run", [&picked_level](void* arg) { + ASSERT_TRUE(arg != nullptr); + picked_level = *(static_cast(arg)); + }); + bool need_flush = false; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::IngestExternalFile:NeedFlush", [&need_flush](void* arg) { + ASSERT_TRUE(arg != nullptr); + need_flush = *(static_cast(arg)); + }); + bool overlap_with_db = false; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile", + [&overlap_with_db](void* arg) { + ASSERT_TRUE(arg != nullptr); + overlap_with_db = *(static_cast(arg)); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); do { Options options = CurrentOptions(); DestroyAndReopen(options); @@ -889,15 +905,27 @@ TEST_F(ExternalSSTFileTest, OverlappingRanges) { // Insert the generated file s = DeprecatedAddFile({file_name}); - auto it = true_data.lower_bound(Key(range_start)); - if (it != true_data.end() && it->first <= Key(range_end)) { - // This range overlap with data already exist in DB - ASSERT_NOK(s); - failed_add_file++; + if (option_config_ != kUniversalCompaction && + option_config_ != kUniversalCompactionMultiLevel) { + if (it != true_data.end() && it->first <= Key(range_end)) { + // This range overlap with data already exist in DB + ASSERT_NOK(s); + failed_add_file++; + } else { + ASSERT_OK(s); + success_add_file++; + } } else { - ASSERT_OK(s); - success_add_file++; + if ((it != true_data.end() && it->first <= Key(range_end)) || + need_flush || picked_level > 0 || overlap_with_db) { + // This range overlap with data already exist in DB + ASSERT_NOK(s); + failed_add_file++; + } else { + ASSERT_OK(s); + success_add_file++; + } } } @@ -930,8 +958,7 @@ TEST_F(ExternalSSTFileTest, OverlappingRanges) { } printf("keys/values verified\n"); DestroyAndRecreateExternalSSTFilesDir(); - } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction | - kSkipFIFOCompaction)); + } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction)); } TEST_F(ExternalSSTFileTest, PickedLevel) {