diff --git a/db/db_impl.cc b/db/db_impl.cc index 7f6d12eea..f2fb00e2d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3274,10 +3274,6 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family, auto cfh = reinterpret_cast(column_family); ColumnFamilyData* cfd = cfh->cfd(); - if (cfd->NumberLevels() <= 1) { - return Status::NotSupported( - "AddFile requires a database with at least 2 levels"); - } if (file_info->version != 1) { return Status::InvalidArgument("Generated table version is not supported"); } @@ -3324,48 +3320,44 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family, WriteThread::Writer w; write_thread_.EnterUnbatched(&w, &mutex_); - // Make sure memtables are empty - if (!cfd->mem()->IsEmpty() || cfd->imm()->NumNotFlushed() > 0) { - // Cannot add the file since the keys in memtable - // will hide the keys in file - status = Status::NotSupported("Memtable is not empty"); + if (!snapshots_.empty()) { + // Check that no snapshots are being held + status = + Status::NotSupported("Cannot add a file while holding snapshots"); } - // Make sure last sequence number is 0, if there are existing files then - // they should have sequence number = 0 - if (status.ok() && versions_->LastSequence() > 0) { - status = Status::NotSupported("Last Sequence number is not zero"); - } - - auto* vstorage = cfd->current()->storage_info(); if (status.ok()) { - // Make sure that the key range in the file we will add does not overlap - // with previously added files - Slice smallest_user_key = meta.smallest.user_key(); - Slice largest_user_key = meta.largest.user_key(); - for (int level = 0; level < vstorage->num_non_empty_levels(); level++) { - if (vstorage->OverlapInLevel(level, &smallest_user_key, - &largest_user_key)) { - status = Status::NotSupported("Cannot add overlapping files"); - break; + // Verify that added file key range dont overlap with any keys in DB + SuperVersion* sv = cfd->GetSuperVersion()->Ref(); + Arena arena; + ReadOptions ro; + ro.total_order_seek = true; + ScopedArenaIterator iter(NewInternalIterator(ro, cfd, sv, &arena)); + + InternalKey range_start(file_info->smallest_key, kMaxSequenceNumber, + kTypeValue); + iter->Seek(range_start.Encode()); + status = iter->status(); + + if (status.ok() && iter->Valid()) { + ParsedInternalKey seek_result; + if (ParseInternalKey(iter->key(), &seek_result)) { + auto* vstorage = cfd->current()->storage_info(); + if (vstorage->InternalComparator()->user_comparator()->Compare( + seek_result.user_key, file_info->largest_key) <= 0) { + status = Status::NotSupported("Cannot add overlapping range"); + } + } else { + status = Status::Corruption("DB have corrupted keys"); } } } if (status.ok()) { - // We add the file to the last level - int target_level = cfd->NumberLevels() - 1; - if (cfd->ioptions()->level_compaction_dynamic_level_bytes == false) { - // If we are using dynamic level compaction we add the file to - // last level with files - target_level = vstorage->num_non_empty_levels() - 1; - if (target_level <= 0) { - target_level = 1; - } - } + // Add file to L0 VersionEdit edit; edit.SetColumnFamily(cfd->GetID()); - edit.AddFile(target_level, meta.fd.GetNumber(), meta.fd.GetPathId(), + edit.AddFile(0, meta.fd.GetNumber(), meta.fd.GetPathId(), meta.fd.GetFileSize(), meta.smallest, meta.largest, meta.smallest_seqno, meta.largest_seqno, meta.marked_for_compaction); diff --git a/db/db_test.cc b/db/db_test.cc index d9d27f346..c50b6c6ba 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -9602,9 +9602,15 @@ TEST_F(DBTest, AddExternalSstFile) { ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); } - // Add file using file info - s = db_->AddFile(&file2_info); - ASSERT_TRUE(s.ok()) << s.ToString(); + // Add file while holding a snapshot will fail + const Snapshot* s1 = db_->GetSnapshot(); + if (s1 != nullptr) { + ASSERT_NOK(db_->AddFile(&file2_info)); + db_->ReleaseSnapshot(s1); + } + // We can add the file after releaseing the snapshot + ASSERT_OK(db_->AddFile(&file2_info)); + ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U); for (int k = 0; k < 200; k++) { ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); @@ -9624,9 +9630,8 @@ TEST_F(DBTest, AddExternalSstFile) { } ASSERT_NE(db_->GetLatestSequenceNumber(), 0U); - // DB have values in memtable now, we cannot add files anymore - s = db_->AddFile(file5); - ASSERT_FALSE(s.ok()) << s.ToString(); + // Key range of file5 (400 => 499) dont overlap with any keys in DB + ASSERT_OK(db_->AddFile(file5)); // Make sure values are correct before and after flush/compaction for (int i = 0; i < 2; i++) { @@ -9637,13 +9642,37 @@ TEST_F(DBTest, AddExternalSstFile) { } ASSERT_EQ(Get(Key(k)), value); } + for (int k = 400; k < 500; k++) { + std::string value = Key(k) + "_val"; + ASSERT_EQ(Get(Key(k)), value); + } ASSERT_OK(Flush()); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); } - // DB sequence number is not zero, cannot add files anymore - s = db_->AddFile(file5); - ASSERT_FALSE(s.ok()) << s.ToString(); + Close(); + options.disable_auto_compactions = true; + Reopen(options); + + // Delete keys in range (400 => 499) + for (int k = 400; k < 500; k++) { + ASSERT_OK(Delete(Key(k))); + } + // We deleted range (400 => 499) but cannot add file5 because + // of the range tombstones + ASSERT_NOK(db_->AddFile(file5)); + + // Compacting the DB will remove the tombstones + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + // Now we can add the file + ASSERT_OK(db_->AddFile(file5)); + + // Verify values of file5 in DB + for (int k = 400; k < 500; k++) { + std::string value = Key(k) + "_val"; + ASSERT_EQ(Get(Key(k)), value); + } } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction | kSkipFIFOCompaction)); } @@ -9816,6 +9845,107 @@ TEST_F(DBTest, AddExternalSstFileMultiThreaded) { } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction | kSkipFIFOCompaction)); } + +TEST_F(DBTest, AddExternalSstFileOverlappingRanges) { + std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/"; + Random rnd(301); + do { + env_->CreateDir(sst_files_folder); + Options options = CurrentOptions(); + DestroyAndReopen(options); + const ImmutableCFOptions ioptions(options); + SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator); + + printf("Option config = %d\n", option_config_); + std::vector> key_ranges; + for (int i = 0; i < 500; i++) { + int range_start = rnd.Uniform(20000); + int keys_per_range = 10 + rnd.Uniform(41); + + key_ranges.emplace_back(range_start, range_start + keys_per_range); + } + + int memtable_add = 0; + int success_add_file = 0; + int failed_add_file = 0; + std::map true_data; + for (size_t i = 0; i < key_ranges.size(); i++) { + int range_start = key_ranges[i].first; + int range_end = key_ranges[i].second; + + Status s; + std::string range_val = "range_" + ToString(i); + + // For 20% of ranges we use DB::Put, for 80% we use DB::AddFile + if (i && i % 5 == 0) { + // Use DB::Put to insert range (insert into memtable) + range_val += "_put"; + for (int k = range_start; k <= range_end; k++) { + s = Put(Key(k), range_val); + ASSERT_OK(s); + } + memtable_add++; + } else { + // Use DB::AddFile to insert range + range_val += "_add_file"; + + // Generate the file containing the range + std::string file_name = sst_files_folder + env_->GenerateUniqueId(); + ASSERT_OK(sst_file_writer.Open(file_name)); + for (int k = range_start; k <= range_end; k++) { + s = sst_file_writer.Add(Key(k), range_val); + ASSERT_OK(s); + } + ExternalSstFileInfo file_info; + s = sst_file_writer.Finish(&file_info); + ASSERT_OK(s); + + // Insert the generated file + s = db_->AddFile(&file_info); + + auto it = true_data.lower_bound(Key(range_start)); + if (it != true_data.end() && it->first <= Key(range_end)) { + // This range overlap with data already exist in DB + ASSERT_NOK(s); + failed_add_file++; + } else { + ASSERT_OK(s); + success_add_file++; + } + } + + if (s.ok()) { + // Update true_data map to include the new inserted data + for (int k = range_start; k <= range_end; k++) { + true_data[Key(k)] = range_val; + } + } + + // Flush / Compact the DB + if (i && i % 50 == 0) { + Flush(); + } + if (i && i % 75 == 0) { + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + } + } + + printf( + "Total: %zu ranges\n" + "AddFile()|Success: %d ranges\n" + "AddFile()|RangeConflict: %d ranges\n" + "Put(): %d ranges\n", + key_ranges.size(), success_add_file, failed_add_file, memtable_add); + + // Verify the correctness of the data + for (const auto& kv : true_data) { + ASSERT_EQ(Get(kv.first), kv.second); + } + printf("keys/values verified\n"); + } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction | + kSkipFIFOCompaction)); +} + #endif // ROCKSDB_LITE // 1 Create some SST files by inserting K-V pairs into DB diff --git a/db/version_builder.cc b/db/version_builder.cc index 7e6358e5f..6717885b5 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -136,7 +136,10 @@ class VersionBuilder::Rep { auto f2 = level_files[i]; if (level == 0) { assert(level_zero_cmp_(f1, f2)); - assert(f1->largest_seqno > f2->largest_seqno); + assert(f1->largest_seqno > f2->largest_seqno || + // We can have multiple files with seqno = 0 as a result of + // using DB::AddFile() + (f1->largest_seqno == 0 && f2->largest_seqno == 0)); } else { assert(level_nonzero_cmp_(f1, f2)); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index cb194cbaa..a73e28400 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -679,13 +679,11 @@ class DB { // move the file instead of copying it. // // Current Requirements: - // (1) Memtable is empty. - // (2) All existing files (if any) have sequence number = 0. - // (3) Key range in loaded table file don't overlap with existing - // files key ranges. - // (4) No other writes happen during AddFile call, otherwise + // (1) Key range in loaded table file don't overlap with + // existing keys or tombstones in DB. + // (2) No other writes happen during AddFile call, otherwise // DB may get corrupted. - // (5) Database have at least 2 levels. + // (3) No snapshots are held. virtual Status AddFile(ColumnFamilyHandle* column_family, const std::string& file_path, bool move_file = false) = 0;