diff --git a/db/compaction.cc b/db/compaction.cc index b74069b6b..cb96676c0 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -187,8 +187,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage, } } - Slice smallest_user_key; - GetBoundaryKeys(vstorage, inputs_, &smallest_user_key, &largest_user_key_); + GetBoundaryKeys(vstorage, inputs_, &smallest_user_key_, &largest_user_key_); } Compaction::~Compaction() { diff --git a/db/compaction.h b/db/compaction.h index c1ec45081..a596f313a 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -219,6 +219,8 @@ class Compaction { output_table_properties_ = std::move(tp); } + Slice GetSmallestUserKey() const { return smallest_user_key_; } + Slice GetLargestUserKey() const { return largest_user_key_; } CompactionReason compaction_reason() { return compaction_reason_; } @@ -294,6 +296,9 @@ class Compaction { // table properties of output files TablePropertiesCollection output_table_properties_; + // smallest user keys in compaction + Slice smallest_user_key_; + // largest user keys in compaction Slice largest_user_key_; diff --git a/db/db_impl.cc b/db/db_impl.cc index 84f70bc1f..35e39ee17 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -68,7 +68,6 @@ #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" -#include "rocksdb/sst_file_writer.h" #include "rocksdb/statistics.h" #include "rocksdb/status.h" #include "rocksdb/table.h" @@ -2090,6 +2089,7 @@ Status DBImpl::CompactFilesImpl( c->SetInputVersion(version); // deletion compaction currently not allowed in CompactFiles. assert(!c->deletion_compaction()); + running_compactions_.insert(c.get()); SequenceNumber earliest_write_conflict_snapshot; std::vector snapshot_seqs = @@ -2145,6 +2145,8 @@ Status DBImpl::CompactFilesImpl( ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); + running_compactions_.erase(c.get()); + if (status.ok()) { // Done } else if (status.IsShutdownInProgress()) { @@ -3085,6 +3087,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, reinterpret_cast(arg); *made_progress = false; mutex_.AssertHeld(); + TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start"); bool is_manual = (manual_compaction != nullptr); @@ -3199,6 +3202,10 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } } + if (c != nullptr) { + running_compactions_.insert(c.get()); + } + if (!c) { // Nothing to do LogToBuffer(log_buffer, "Compaction nothing to do"); @@ -3325,6 +3332,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, NotifyOnCompactionCompleted( c->column_family_data(), c.get(), status, compaction_job_stats, job_context->job_id); + running_compactions_.erase(c.get()); } // this will unref its input_version and column_family_data c.reset(); @@ -3766,247 +3774,6 @@ std::vector DBImpl::MultiGet( return stat_list; } -#ifndef ROCKSDB_LITE -Status DBImpl::AddFile(ColumnFamilyHandle* column_family, - const std::string& file_path, bool move_file) { - Status status; - auto cfh = reinterpret_cast(column_family); - ColumnFamilyData* cfd = cfh->cfd(); - - ExternalSstFileInfo file_info; - file_info.file_path = file_path; - status = env_->GetFileSize(file_path, &file_info.file_size); - if (!status.ok()) { - return status; - } - - // Access the file using TableReader to extract - // version, number of entries, smallest user key, largest user key - std::unique_ptr sst_file; - status = env_->NewRandomAccessFile(file_path, &sst_file, env_options_); - if (!status.ok()) { - return status; - } - std::unique_ptr sst_file_reader; - sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file))); - - std::unique_ptr table_reader; - status = cfd->ioptions()->table_factory->NewTableReader( - TableReaderOptions(*cfd->ioptions(), env_options_, - cfd->internal_comparator()), - std::move(sst_file_reader), file_info.file_size, &table_reader); - if (!status.ok()) { - return status; - } - - // Get the external sst file version from table properties - const UserCollectedProperties& user_collected_properties = - table_reader->GetTableProperties()->user_collected_properties; - UserCollectedProperties::const_iterator external_sst_file_version_iter = - user_collected_properties.find(ExternalSstFilePropertyNames::kVersion); - if (external_sst_file_version_iter == user_collected_properties.end()) { - return Status::InvalidArgument("Generated table version not found"); - } - - file_info.version = - DecodeFixed32(external_sst_file_version_iter->second.c_str()); - if (file_info.version == 1) { - // version 1 imply that all sequence numbers in table equal 0 - file_info.sequence_number = 0; - } else { - return Status::InvalidArgument("Generated table version is not supported"); - } - - // Get number of entries in table - file_info.num_entries = table_reader->GetTableProperties()->num_entries; - - ParsedInternalKey key; - std::unique_ptr iter( - table_reader->NewIterator(ReadOptions())); - - // Get first (smallest) key from file - iter->SeekToFirst(); - if (!ParseInternalKey(iter->key(), &key)) { - return Status::Corruption("Generated table have corrupted keys"); - } - if (key.sequence != 0) { - return Status::Corruption("Generated table have non zero sequence number"); - } - file_info.smallest_key = key.user_key.ToString(); - - // Get last (largest) key from file - iter->SeekToLast(); - if (!ParseInternalKey(iter->key(), &key)) { - return Status::Corruption("Generated table have corrupted keys"); - } - if (key.sequence != 0) { - return Status::Corruption("Generated table have non zero sequence number"); - } - file_info.largest_key = key.user_key.ToString(); - - return AddFile(column_family, &file_info, move_file); -} - -Status DBImpl::AddFile(ColumnFamilyHandle* column_family, - const ExternalSstFileInfo* file_info, bool move_file) { - Status status; - auto cfh = reinterpret_cast(column_family); - ColumnFamilyData* cfd = cfh->cfd(); - const uint64_t start_micros = env_->NowMicros(); - - if (file_info->num_entries == 0) { - return Status::InvalidArgument("File contain no entries"); - } - if (file_info->version != 1) { - return Status::InvalidArgument("Generated table version is not supported"); - } - // version 1 imply that file have only Put Operations with Sequence Number = 0 - - FileMetaData meta; - meta.smallest = - InternalKey(file_info->smallest_key, file_info->sequence_number, - ValueType::kTypeValue); - meta.largest = InternalKey(file_info->largest_key, file_info->sequence_number, - ValueType::kTypeValue); - if (!meta.smallest.Valid() || !meta.largest.Valid()) { - return Status::Corruption("Generated table have corrupted keys"); - } - meta.smallest_seqno = file_info->sequence_number; - meta.largest_seqno = file_info->sequence_number; - if (meta.smallest_seqno != 0 || meta.largest_seqno != 0) { - return Status::InvalidArgument( - "Non zero sequence numbers are not supported"); - } - - // Generate a location for the new table - std::list::iterator pending_outputs_inserted_elem; - { - InstrumentedMutexLock l(&mutex_); - pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs(); - meta.fd = - FileDescriptor(versions_->NewFileNumber(), 0, file_info->file_size); - } - - std::string db_fname = TableFileName( - db_options_.db_paths, meta.fd.GetNumber(), meta.fd.GetPathId()); - - if (move_file) { - status = env_->LinkFile(file_info->file_path, db_fname); - if (status.IsNotSupported()) { - // Original file is on a different FS, use copy instead of hard linking - status = CopyFile(env_, file_info->file_path, db_fname, 0); - } - } else { - status = CopyFile(env_, file_info->file_path, db_fname, 0); - } - TEST_SYNC_POINT("DBImpl::AddFile:FileCopied"); - if (!status.ok()) { - return status; - } - - { - InstrumentedMutexLock l(&mutex_); - const MutableCFOptions mutable_cf_options = - *cfd->GetLatestMutableCFOptions(); - - WriteThread::Writer w; - write_thread_.EnterUnbatched(&w, &mutex_); - - if (!snapshots_.empty()) { - // Check that no snapshots are being held - status = - Status::NotSupported("Cannot add a file while holding snapshots"); - } - - if (status.ok()) { - // Verify that added file key range dont overlap with any keys in DB - SuperVersion* sv = cfd->GetSuperVersion()->Ref(); - Arena arena; - ReadOptions ro; - ro.total_order_seek = true; - ScopedArenaIterator iter(NewInternalIterator(ro, cfd, sv, &arena)); - - InternalKey range_start(file_info->smallest_key, kMaxSequenceNumber, - kTypeValue); - iter->Seek(range_start.Encode()); - status = iter->status(); - - if (status.ok() && iter->Valid()) { - ParsedInternalKey seek_result; - if (ParseInternalKey(iter->key(), &seek_result)) { - auto* vstorage = cfd->current()->storage_info(); - if (vstorage->InternalComparator()->user_comparator()->Compare( - seek_result.user_key, file_info->largest_key) <= 0) { - status = Status::NotSupported("Cannot add overlapping range"); - } - } else { - status = Status::Corruption("DB have corrupted keys"); - } - } - } - - if (status.ok()) { - // Add file to L0 - VersionEdit edit; - edit.SetColumnFamily(cfd->GetID()); - edit.AddFile(0, meta.fd.GetNumber(), meta.fd.GetPathId(), - meta.fd.GetFileSize(), meta.smallest, meta.largest, - meta.smallest_seqno, meta.largest_seqno, - meta.marked_for_compaction); - - status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, - directories_.GetDbDir()); - } - write_thread_.ExitUnbatched(&w); - - if (status.ok()) { - delete InstallSuperVersionAndScheduleWork(cfd, nullptr, - mutable_cf_options); - - // Update internal stats - InternalStats::CompactionStats stats(1); - stats.micros = env_->NowMicros() - start_micros; - stats.bytes_written = meta.fd.GetFileSize(); - stats.num_output_files = 1; - cfd->internal_stats()->AddCompactionStats(0 /* L0 */, stats); - cfd->internal_stats()->AddCFStats(InternalStats::BYTES_INGESTED_ADD_FILE, - meta.fd.GetFileSize()); - } - ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); - } - - if (!status.ok()) { - // We failed to add the file to the database - Status s = env_->DeleteFile(db_fname); - if (!s.ok()) { - Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, - "AddFile() clean up for file %s failed : %s", db_fname.c_str(), - s.ToString().c_str()); - } - } else { - // File was ingested successfully - Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "New file %" PRIu64 " was added to L0 (Size: %.2f MB, " - "entries: %" PRIu64 ")", - meta.fd.GetNumber(), - static_cast(meta.fd.GetFileSize()) / 1048576.0, - file_info->num_entries); - - if (move_file) { - // The file was moved and added successfully, remove original file link - Status s = env_->DeleteFile(file_info->file_path); - if (!s.ok()) { - Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, - "%s was added to DB successfully but failed to remove original " - "file link : %s", - file_info->file_path.c_str(), s.ToString().c_str()); - } - } - } - return status; -} -#endif // ROCKSDB_LITE - Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, const std::string& column_family_name, ColumnFamilyHandle** handle) { diff --git a/db/db_impl.h b/db/db_impl.h index cceed0321..3c432582e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -623,6 +623,11 @@ class DBImpl : public DB { Status WaitForFlushMemTable(ColumnFamilyData* cfd); #ifndef ROCKSDB_LITE + // Finds the lowest level in the DB that the ingested file can be added to + // REQUIRES: mutex_ held + int PickLevelForIngestedFile(ColumnFamilyData* cfd, + const ExternalSstFileInfo* file_info); + Status CompactFilesImpl( const CompactionOptions& compact_options, ColumnFamilyData* cfd, Version* version, const std::vector& input_file_names, @@ -912,6 +917,10 @@ class DBImpl : public DB { // The options to access storage files const EnvOptions env_options_; + // A set of compactions that are running right now + // REQUIRES: mutex held + std::unordered_set running_compactions_; + #ifndef ROCKSDB_LITE WalManager wal_manager_; #endif // ROCKSDB_LITE diff --git a/db/db_impl_add_file.cc b/db/db_impl_add_file.cc new file mode 100644 index 000000000..b76c47064 --- /dev/null +++ b/db/db_impl_add_file.cc @@ -0,0 +1,321 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "db/db_impl.h" + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include + +#include "db/builder.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/sst_file_writer.h" +#include "table/table_builder.h" +#include "util/file_reader_writer.h" +#include "util/file_util.h" +#include "util/sync_point.h" + +namespace rocksdb { +#ifndef ROCKSDB_LITE +Status DBImpl::AddFile(ColumnFamilyHandle* column_family, + const std::string& file_path, bool move_file) { + Status status; + auto cfh = reinterpret_cast(column_family); + ColumnFamilyData* cfd = cfh->cfd(); + + ExternalSstFileInfo file_info; + file_info.file_path = file_path; + status = env_->GetFileSize(file_path, &file_info.file_size); + if (!status.ok()) { + return status; + } + + // Access the file using TableReader to extract + // version, number of entries, smallest user key, largest user key + std::unique_ptr sst_file; + status = env_->NewRandomAccessFile(file_path, &sst_file, env_options_); + if (!status.ok()) { + return status; + } + std::unique_ptr sst_file_reader; + sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file))); + + std::unique_ptr table_reader; + status = cfd->ioptions()->table_factory->NewTableReader( + TableReaderOptions(*cfd->ioptions(), env_options_, + cfd->internal_comparator()), + std::move(sst_file_reader), file_info.file_size, &table_reader); + if (!status.ok()) { + return status; + } + + // Get the external sst file version from table properties + const UserCollectedProperties& user_collected_properties = + table_reader->GetTableProperties()->user_collected_properties; + UserCollectedProperties::const_iterator external_sst_file_version_iter = + user_collected_properties.find(ExternalSstFilePropertyNames::kVersion); + if (external_sst_file_version_iter == user_collected_properties.end()) { + return Status::InvalidArgument("Generated table version not found"); + } + + file_info.version = + DecodeFixed32(external_sst_file_version_iter->second.c_str()); + if (file_info.version == 1) { + // version 1 imply that all sequence numbers in table equal 0 + file_info.sequence_number = 0; + } else { + return Status::InvalidArgument("Generated table version is not supported"); + } + + // Get number of entries in table + file_info.num_entries = table_reader->GetTableProperties()->num_entries; + + ParsedInternalKey key; + std::unique_ptr iter( + table_reader->NewIterator(ReadOptions())); + + // Get first (smallest) key from file + iter->SeekToFirst(); + if (!ParseInternalKey(iter->key(), &key)) { + return Status::Corruption("Generated table have corrupted keys"); + } + if (key.sequence != 0) { + return Status::Corruption("Generated table have non zero sequence number"); + } + file_info.smallest_key = key.user_key.ToString(); + + // Get last (largest) key from file + iter->SeekToLast(); + if (!ParseInternalKey(iter->key(), &key)) { + return Status::Corruption("Generated table have corrupted keys"); + } + if (key.sequence != 0) { + return Status::Corruption("Generated table have non zero sequence number"); + } + file_info.largest_key = key.user_key.ToString(); + + return AddFile(column_family, &file_info, move_file); +} + +Status DBImpl::AddFile(ColumnFamilyHandle* column_family, + const ExternalSstFileInfo* file_info, bool move_file) { + Status status; + auto cfh = reinterpret_cast(column_family); + ColumnFamilyData* cfd = cfh->cfd(); + const uint64_t start_micros = env_->NowMicros(); + + if (file_info->num_entries == 0) { + return Status::InvalidArgument("File contain no entries"); + } + if (file_info->version != 1) { + return Status::InvalidArgument("Generated table version is not supported"); + } + // version 1 imply that file have only Put Operations with Sequence Number = 0 + + FileMetaData meta; + meta.smallest = + InternalKey(file_info->smallest_key, file_info->sequence_number, + ValueType::kTypeValue); + meta.largest = InternalKey(file_info->largest_key, file_info->sequence_number, + ValueType::kTypeValue); + if (!meta.smallest.Valid() || !meta.largest.Valid()) { + return Status::Corruption("Generated table have corrupted keys"); + } + meta.smallest_seqno = file_info->sequence_number; + meta.largest_seqno = file_info->sequence_number; + if (meta.smallest_seqno != 0 || meta.largest_seqno != 0) { + return Status::InvalidArgument( + "Non zero sequence numbers are not supported"); + } + + // Generate a location for the new table + std::list::iterator pending_outputs_inserted_elem; + { + InstrumentedMutexLock l(&mutex_); + pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs(); + meta.fd = + FileDescriptor(versions_->NewFileNumber(), 0, file_info->file_size); + } + + std::string db_fname = TableFileName( + db_options_.db_paths, meta.fd.GetNumber(), meta.fd.GetPathId()); + + if (move_file) { + status = env_->LinkFile(file_info->file_path, db_fname); + if (status.IsNotSupported()) { + // Original file is on a different FS, use copy instead of hard linking + status = CopyFile(env_, file_info->file_path, db_fname, 0); + } + } else { + status = CopyFile(env_, file_info->file_path, db_fname, 0); + } + TEST_SYNC_POINT("DBImpl::AddFile:FileCopied"); + if (!status.ok()) { + return status; + } + + // The level the file will be ingested into + int target_level = 0; + { + InstrumentedMutexLock l(&mutex_); + const MutableCFOptions mutable_cf_options = + *cfd->GetLatestMutableCFOptions(); + + WriteThread::Writer w; + write_thread_.EnterUnbatched(&w, &mutex_); + + if (!snapshots_.empty()) { + // Check that no snapshots are being held + status = + Status::NotSupported("Cannot add a file while holding snapshots"); + } + + if (status.ok()) { + // Verify that added file key range dont overlap with any keys in DB + SuperVersion* sv = cfd->GetSuperVersion()->Ref(); + Arena arena; + ReadOptions ro; + ro.total_order_seek = true; + ScopedArenaIterator iter(NewInternalIterator(ro, cfd, sv, &arena)); + + InternalKey range_start(file_info->smallest_key, kMaxSequenceNumber, + kTypeValue); + iter->Seek(range_start.Encode()); + status = iter->status(); + + if (status.ok() && iter->Valid()) { + ParsedInternalKey seek_result; + if (ParseInternalKey(iter->key(), &seek_result)) { + auto* vstorage = cfd->current()->storage_info(); + if (vstorage->InternalComparator()->user_comparator()->Compare( + seek_result.user_key, file_info->largest_key) <= 0) { + status = Status::NotSupported("Cannot add overlapping range"); + } + } else { + status = Status::Corruption("DB have corrupted keys"); + } + } + } + + if (status.ok()) { + // Add file to the lowest possible level + target_level = PickLevelForIngestedFile(cfd, file_info); + VersionEdit edit; + edit.SetColumnFamily(cfd->GetID()); + edit.AddFile(target_level, meta.fd.GetNumber(), meta.fd.GetPathId(), + meta.fd.GetFileSize(), meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno, + meta.marked_for_compaction); + + status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, + directories_.GetDbDir()); + } + write_thread_.ExitUnbatched(&w); + + if (status.ok()) { + delete InstallSuperVersionAndScheduleWork(cfd, nullptr, + mutable_cf_options); + + // Update internal stats + InternalStats::CompactionStats stats(1); + stats.micros = env_->NowMicros() - start_micros; + stats.bytes_written = meta.fd.GetFileSize(); + stats.num_output_files = 1; + cfd->internal_stats()->AddCompactionStats(target_level, stats); + cfd->internal_stats()->AddCFStats(InternalStats::BYTES_INGESTED_ADD_FILE, + meta.fd.GetFileSize()); + } + ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); + } + + if (!status.ok()) { + // We failed to add the file to the database + Status s = env_->DeleteFile(db_fname); + if (!s.ok()) { + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "AddFile() clean up for file %s failed : %s", db_fname.c_str(), + s.ToString().c_str()); + } + } else { + // File was ingested successfully + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "New file %" PRIu64 + " was added to L%d (Size: %.2f MB, " + "entries: %" PRIu64 ")", + meta.fd.GetNumber(), target_level, + static_cast(meta.fd.GetFileSize()) / 1048576.0, + file_info->num_entries); + + if (move_file) { + // The file was moved and added successfully, remove original file link + Status s = env_->DeleteFile(file_info->file_path); + if (!s.ok()) { + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "%s was added to DB successfully but failed to remove original " + "file link : %s", + file_info->file_path.c_str(), s.ToString().c_str()); + } + } + } + return status; +} + +// Finds the lowest level in the DB that the ingested file can be added to +int DBImpl::PickLevelForIngestedFile(ColumnFamilyData* cfd, + const ExternalSstFileInfo* file_info) { + mutex_.AssertHeld(); + + int target_level = 0; + auto* vstorage = cfd->current()->storage_info(); + auto* ucmp = vstorage->InternalComparator()->user_comparator(); + + Slice file_smallest_user_key(file_info->smallest_key); + Slice file_largest_user_key(file_info->largest_key); + + for (int lvl = cfd->NumberLevels() - 1; lvl >= vstorage->base_level(); + lvl--) { + if (vstorage->OverlapInLevel(lvl, &file_smallest_user_key, + &file_largest_user_key) == false) { + // Make sure that the file dont overlap with the output of any + // compaction running right now + Slice compaction_smallest_user_key; + Slice compaction_largest_user_key; + bool overlap_with_compaction_output = false; + for (Compaction* c : running_compactions_) { + if (c->column_family_data()->GetID() != cfd->GetID() || + c->output_level() != lvl) { + continue; + } + + compaction_smallest_user_key = c->GetSmallestUserKey(); + compaction_largest_user_key = c->GetLargestUserKey(); + + if (ucmp->Compare(file_smallest_user_key, + compaction_largest_user_key) <= 0 && + ucmp->Compare(file_largest_user_key, + compaction_smallest_user_key) >= 0) { + overlap_with_compaction_output = true; + break; + } + } + + if (overlap_with_compaction_output == false) { + // Level lvl is the lowest level that dont have any files with key + // range overlapping with our file key range and no compactions + // planning to add overlapping files in it. + target_level = lvl; + break; + } + } + } + + return target_level; +} +#endif // ROCKSDB_LITE + +} // namespace rocksdb diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index 1a7e98163..5de7b6d3b 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -1243,6 +1243,215 @@ TEST_F(DBSSTTest, AddExternalSstFileOverlappingRanges) { kSkipFIFOCompaction)); } +TEST_F(DBSSTTest, AddExternalSstFilePickedLevel) { + std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/"; + env_->CreateDir(sst_files_folder); + Options options = CurrentOptions(); + options.disable_auto_compactions = false; + options.level0_file_num_compaction_trigger = 4; + options.num_levels = 4; + options.env = env_; + DestroyAndReopen(options); + + std::vector> file_to_keys; + + // File 0 will go to last level (L3) + file_to_keys.push_back({1, 10}); + ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), + file_to_keys.size() - 1)); + EXPECT_EQ(FilesPerLevel(), "0,0,0,1"); + + // File 1 will go to level L2 (since it overlap with file 0 in L3) + file_to_keys.push_back({2, 9}); + ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), + file_to_keys.size() - 1)); + EXPECT_EQ(FilesPerLevel(), "0,0,1,1"); + + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"DBSSTTest::AddExternalSstFilePickedLevel:0", + "BackgroundCallCompaction:0"}, + {"DBImpl::BackgroundCompaction:Start", + "DBSSTTest::AddExternalSstFilePickedLevel:1"}, + {"DBSSTTest::AddExternalSstFilePickedLevel:2", + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Flush 4 files containing the same keys + for (int i = 0; i < 4; i++) { + ASSERT_OK(Put(Key(3), Key(3) + "put")); + ASSERT_OK(Put(Key(8), Key(8) + "put")); + ASSERT_OK(Flush()); + } + + // Wait for BackgroundCompaction() to be called + TEST_SYNC_POINT("DBSSTTest::AddExternalSstFilePickedLevel:0"); + TEST_SYNC_POINT("DBSSTTest::AddExternalSstFilePickedLevel:1"); + + EXPECT_EQ(FilesPerLevel(), "4,0,1,1"); + + // This file overlaps with file 0 (L3), file 1 (L2) and the + // output of compaction going to L1 + file_to_keys.push_back({4, 7}); + ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), + file_to_keys.size() - 1)); + EXPECT_EQ(FilesPerLevel(), "5,0,1,1"); + + // This file does not overlap with any file or with the running compaction + file_to_keys.push_back({9000, 9001}); + ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), + file_to_keys.size() - 1)); + EXPECT_EQ(FilesPerLevel(), "5,0,1,2"); + + // Hold compaction from finishing + TEST_SYNC_POINT("DBSSTTest::AddExternalSstFilePickedLevel:2"); + + dbfull()->TEST_WaitForCompact(); + EXPECT_EQ(FilesPerLevel(), "1,1,1,2"); + + for (size_t file_id = 0; file_id < file_to_keys.size(); file_id++) { + for (auto& key_id : file_to_keys[file_id]) { + std::string k = Key(key_id); + std::string v = k + ToString(file_id); + if (key_id == 3 || key_id == 8) { + v = k + "put"; + } + + ASSERT_EQ(Get(k), v); + } + } + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBSSTTest, AddExternalSstFilePickedLevelDynamic) { + std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/"; + env_->CreateDir(sst_files_folder); + Options options = CurrentOptions(); + options.disable_auto_compactions = false; + options.level0_file_num_compaction_trigger = 4; + options.level_compaction_dynamic_level_bytes = true; + options.num_levels = 4; + options.env = env_; + DestroyAndReopen(options); + + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"DBSSTTest::AddExternalSstFilePickedLevelDynamic:0", + "BackgroundCallCompaction:0"}, + {"DBImpl::BackgroundCompaction:Start", + "DBSSTTest::AddExternalSstFilePickedLevelDynamic:1"}, + {"DBSSTTest::AddExternalSstFilePickedLevelDynamic:2", + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Flush 4 files containing the same keys + for (int i = 0; i < 4; i++) { + for (int k = 20; k <= 30; k++) { + ASSERT_OK(Put(Key(k), Key(k) + "put")); + } + for (int k = 50; k <= 60; k++) { + ASSERT_OK(Put(Key(k), Key(k) + "put")); + } + ASSERT_OK(Flush()); + } + + // Wait for BackgroundCompaction() to be called + TEST_SYNC_POINT("DBSSTTest::AddExternalSstFilePickedLevelDynamic:0"); + TEST_SYNC_POINT("DBSSTTest::AddExternalSstFilePickedLevelDynamic:1"); + std::vector> file_to_keys; + + // This file overlaps with the output of the compaction (going to L3) + // so the file will be added to L0 since L3 is the base level + file_to_keys.push_back({31, 32, 33, 34}); + ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), + file_to_keys.size() - 1)); + EXPECT_EQ(FilesPerLevel(), "5"); + + // This file does not overlap with the current running compactiong + file_to_keys.push_back({9000, 9001}); + ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), + file_to_keys.size() - 1)); + EXPECT_EQ(FilesPerLevel(), "5,0,0,1"); + + // Hold compaction from finishing + TEST_SYNC_POINT("DBSSTTest::AddExternalSstFilePickedLevelDynamic:2"); + + // Output of the compaction will go to L3 + dbfull()->TEST_WaitForCompact(); + EXPECT_EQ(FilesPerLevel(), "1,0,0,2"); + + Close(); + options.disable_auto_compactions = true; + Reopen(options); + + file_to_keys.push_back({1, 15, 19}); + ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), + file_to_keys.size() - 1)); + ASSERT_EQ(FilesPerLevel(), "1,0,0,3"); + + file_to_keys.push_back({1000, 1001, 1002}); + ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), + file_to_keys.size() - 1)); + ASSERT_EQ(FilesPerLevel(), "1,0,0,4"); + + file_to_keys.push_back({500, 600, 700}); + ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), + file_to_keys.size() - 1)); + ASSERT_EQ(FilesPerLevel(), "1,0,0,5"); + + // File 5 overlaps with file 2 (L3 / base level) + file_to_keys.push_back({2, 10}); + ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), + file_to_keys.size() - 1)); + ASSERT_EQ(FilesPerLevel(), "2,0,0,5"); + + // File 6 overlaps with file 2 (L3 / base level) and file 5 (L0) + file_to_keys.push_back({3, 9}); + ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), + file_to_keys.size() - 1)); + ASSERT_EQ(FilesPerLevel(), "3,0,0,5"); + + // Verify data in files + for (size_t file_id = 0; file_id < file_to_keys.size(); file_id++) { + for (auto& key_id : file_to_keys[file_id]) { + std::string k = Key(key_id); + std::string v = k + ToString(file_id); + + ASSERT_EQ(Get(k), v); + } + } + + // Write range [5 => 10] to L0 + for (int i = 5; i <= 10; i++) { + std::string k = Key(i); + std::string v = k + "put"; + ASSERT_OK(Put(k, v)); + } + ASSERT_OK(Flush()); + ASSERT_EQ(FilesPerLevel(), "4,0,0,5"); + + // File 7 overlaps with file 4 (L3) + file_to_keys.push_back({650, 651, 652}); + ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), + file_to_keys.size() - 1)); + ASSERT_EQ(FilesPerLevel(), "5,0,0,5"); + + for (size_t file_id = 0; file_id < file_to_keys.size(); file_id++) { + for (auto& key_id : file_to_keys[file_id]) { + std::string k = Key(key_id); + std::string v = k + ToString(file_id); + if (key_id >= 5 && key_id <= 10) { + v = k + "put"; + } + + ASSERT_EQ(Get(k), v); + } + } + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + #endif // ROCKSDB_LITE // 1 Create some SST files by inserting K-V pairs into DB diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 649daaa3a..be31ffba2 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -1075,6 +1075,35 @@ std::vector DBTestBase::ListTableFiles(Env* env, } #ifndef ROCKSDB_LITE + +Status DBTestBase::GenerateAndAddExternalFile(const Options options, + std::vector keys, + size_t file_id) { + std::string file_path = + test::TmpDir(env_) + "/sst_files/" + ToString(file_id); + SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator); + + Status s = sst_file_writer.Open(file_path); + if (!s.ok()) { + return s; + } + for (auto& entry : keys) { + std::string k = Key(entry); + std::string v = k + ToString(file_id); + s = sst_file_writer.Add(k, v); + if (!s.ok()) { + return s; + } + } + s = sst_file_writer.Finish(); + + if (s.ok()) { + s = db_->AddFile(file_path); + } + + return s; +} + uint64_t DBTestBase::GetNumberOfSstFilesForColumnFamily( DB* db, std::string column_family_name) { std::vector metadata; diff --git a/db/db_test_util.h b/db/db_test_util.h index 25580f3aa..81b46ea5f 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -39,6 +39,7 @@ #include "rocksdb/filter_policy.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" +#include "rocksdb/sst_file_writer.h" #include "rocksdb/statistics.h" #include "rocksdb/table.h" #include "rocksdb/utilities/checkpoint.h" @@ -797,6 +798,9 @@ class DBTestBase : public testing::Test { std::vector ListTableFiles(Env* env, const std::string& path); #ifndef ROCKSDB_LITE + Status GenerateAndAddExternalFile(const Options options, + std::vector keys, size_t file_id); + uint64_t GetNumberOfSstFilesForColumnFamily(DB* db, std::string column_family_name); #endif // ROCKSDB_LITE diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 4b9ff8fc2..5338f88fd 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -800,9 +800,10 @@ class DB { // Current Requirements: // (1) Key range in loaded table file don't overlap with // existing keys or tombstones in DB. - // (2) No other writes happen during AddFile call, otherwise - // DB may get corrupted. - // (3) No snapshots are held. + // (2) No snapshots are held. + // + // Notes: We will try to ingest the file to the lowest possible level + // even if the file compression dont match the level compression virtual Status AddFile(ColumnFamilyHandle* column_family, const std::string& file_path, bool move_file = false) = 0; diff --git a/src.mk b/src.mk index da61cc17d..67933d177 100644 --- a/src.mk +++ b/src.mk @@ -16,6 +16,7 @@ LIB_SOURCES = \ db/db_impl_debug.cc \ db/db_impl_readonly.cc \ db/db_impl_experimental.cc \ + db/db_impl_add_file.cc \ db/db_info_dumper.cc \ db/db_iter.cc \ db/experimental.cc \