diff --git a/CMakeLists.txt b/CMakeLists.txt index ea19aba25..677fadc08 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -252,11 +252,11 @@ set(SOURCES db/db_impl.cc db/db_impl_debug.cc db/db_impl_experimental.cc - db/db_impl_add_file.cc db/db_impl_readonly.cc db/db_info_dumper.cc db/db_iter.cc db/event_helpers.cc + db/external_sst_file_ingestion_job.cc db/experimental.cc db/filename.cc db/file_indexer.cc diff --git a/db/db_impl.cc b/db/db_impl.cc index a788979f1..dfc31d307 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -39,6 +39,7 @@ #include "db/db_iter.h" #include "db/dbformat.h" #include "db/event_helpers.h" +#include "db/external_sst_file_ingestion_job.h" #include "db/filename.h" #include "db/flush_job.h" #include "db/forward_iterator.h" @@ -346,7 +347,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) next_job_id_(1), has_unpersisted_data_(false), env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), - num_running_addfile_(0), + num_running_ingest_file_(0), #ifndef ROCKSDB_LITE wal_manager_(immutable_db_options_, env_options_), #endif // ROCKSDB_LITE @@ -2143,8 +2144,8 @@ Status DBImpl::CompactFiles( InstrumentedMutexLock l(&mutex_); // This call will unlock/lock the mutex to wait for current running - // AddFile() calls to finish. - WaitForAddFile(); + // IngestExternalFile() calls to finish. + WaitForIngestFile(); s = CompactFilesImpl(compact_options, cfd, sv->current, input_file_names, output_level, @@ -2899,7 +2900,8 @@ InternalIterator* DBImpl::NewInternalIterator( } Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, - const FlushOptions& flush_options) { + const FlushOptions& flush_options, + bool writes_stopped) { Status s; { WriteContext context; @@ -2911,12 +2913,17 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, } WriteThread::Writer w; - write_thread_.EnterUnbatched(&w, &mutex_); + if (!writes_stopped) { + write_thread_.EnterUnbatched(&w, &mutex_); + } // SwitchMemtable() will release and reacquire mutex // during execution s = SwitchMemtable(cfd, &context); - write_thread_.ExitUnbatched(&w); + + if (!writes_stopped) { + write_thread_.ExitUnbatched(&w); + } cfd->imm()->FlushRequested(); @@ -3295,8 +3302,8 @@ void DBImpl::BackgroundCallCompaction(void* arg) { InstrumentedMutexLock l(&mutex_); // This call will unlock/lock the mutex to wait for current running - // AddFile() calls to finish. - WaitForAddFile(); + // IngestExternalFile() calls to finish. + WaitForIngestFile(); num_running_compactions_++; @@ -3704,8 +3711,8 @@ void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) { } bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) { - if (num_running_addfile_ > 0) { - // We need to wait for other AddFile() calls to finish + if (num_running_ingest_file_ > 0) { + // We need to wait for other IngestExternalFile() calls to finish // before running a manual compaction. return true; } @@ -6364,6 +6371,99 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, return Status::OK(); } + +Status DBImpl::IngestExternalFile( + ColumnFamilyHandle* column_family, + const std::vector& external_files, + const IngestExternalFileOptions& ingestion_options) { + Status status; + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); + + ExternalSstFileIngestionJob ingestion_job(env_, versions_.get(), cfd, + immutable_db_options_, env_options_, + &snapshots_, ingestion_options); + + // Make sure that bg cleanup wont delete the files that we are ingesting + std::list::iterator pending_output_elem; + { + InstrumentedMutexLock l(&mutex_); + pending_output_elem = CaptureCurrentFileNumberInPendingOutputs(); + } + + status = ingestion_job.Prepare(external_files); + if (!status.ok()) { + return status; + } + + { + // Lock db mutex + InstrumentedMutexLock l(&mutex_); + TEST_SYNC_POINT("DBImpl::AddFile:MutexLock"); + num_running_ingest_file_++; + + // Stop writes to the DB + WriteThread::Writer w; + write_thread_.EnterUnbatched(&w, &mutex_); + + // Figure out if we need to flush the memtable first + bool need_flush = false; + status = ingestion_job.NeedsFlush(&need_flush); + if (status.ok() && need_flush) { + mutex_.Unlock(); + status = FlushMemTable(cfd, FlushOptions(), true /* writes_stopped */); + mutex_.Lock(); + } + + // Run the ingestion job + if (status.ok()) { + status = ingestion_job.Run(); + } + + // Install job edit [Mutex will be unlocked here] + auto mutable_cf_options = cfd->GetLatestMutableCFOptions(); + if (status.ok()) { + status = + versions_->LogAndApply(cfd, *mutable_cf_options, ingestion_job.edit(), + &mutex_, directories_.GetDbDir()); + } + if (status.ok()) { + delete InstallSuperVersionAndScheduleWork(cfd, nullptr, + *mutable_cf_options); + } + + // Resume writes to the DB + write_thread_.ExitUnbatched(&w); + + // Update stats + if (status.ok()) { + ingestion_job.UpdateStats(); + } + + ReleaseFileNumberFromPendingOutputs(pending_output_elem); + + num_running_ingest_file_--; + if (num_running_ingest_file_ == 0) { + bg_cv_.SignalAll(); + } + + TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock"); + } + // mutex_ is unlocked here + + // Cleanup + ingestion_job.Cleanup(status); + + return status; +} + +void DBImpl::WaitForIngestFile() { + mutex_.AssertHeld(); + while (num_running_ingest_file_ > 0) { + bg_cv_.Wait(); + } +} + #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/db/db_impl.h b/db/db_impl.h index c7fd9c20e..0847954d8 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -260,13 +260,11 @@ class DBImpl : public DB { bool cache_only, SequenceNumber* seq, bool* found_record_for_key); - using DB::AddFile; - virtual Status AddFile(ColumnFamilyHandle* column_family, - const std::vector& file_info_list, - bool move_file, bool skip_snapshot_check) override; - virtual Status AddFile(ColumnFamilyHandle* column_family, - const std::vector& file_path_list, - bool move_file, bool skip_snapshot_check) override; + using DB::IngestExternalFile; + virtual Status IngestExternalFile( + ColumnFamilyHandle* column_family, + const std::vector& external_files, + const IngestExternalFileOptions& ingestion_options) override; #endif // ROCKSDB_LITE @@ -650,20 +648,13 @@ class DBImpl : public DB { Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); // Force current memtable contents to be flushed. - Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options); + Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options, + bool writes_stopped = false); // Wait for memtable flushed Status WaitForFlushMemTable(ColumnFamilyData* cfd); #ifndef ROCKSDB_LITE - // Finds the lowest level in the DB that the ingested file can be added to - // REQUIRES: mutex_ held - int PickLevelForIngestedFile(ColumnFamilyData* cfd, - const ExternalSstFileInfo& file_info); - - // Wait for current AddFile() calls to finish. - // REQUIRES: mutex_ held - void WaitForAddFile(); Status CompactFilesImpl(const CompactionOptions& compact_options, ColumnFamilyData* cfd, Version* version, @@ -671,14 +662,14 @@ class DBImpl : public DB { const int output_level, int output_path_id, JobContext* job_context, LogBuffer* log_buffer); - Status ReadExternalSstFileInfo(ColumnFamilyHandle* column_family, - const std::string& file_path, - ExternalSstFileInfo* file_info); + // Wait for current IngestExternalFile() calls to finish. + // REQUIRES: mutex_ held + void WaitForIngestFile(); #else - // AddFile is not supported in ROCKSDB_LITE so this function + // IngestExternalFile is not supported in ROCKSDB_LITE so this function // will be no-op - void WaitForAddFile() {} + void WaitForIngestFile() {} #endif // ROCKSDB_LITE ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name); @@ -752,7 +743,7 @@ class DBImpl : public DB { // * whenever bg_flush_scheduled_ or bg_purge_scheduled_ value decreases // (i.e. whenever a flush is done, even if it didn't make any progress) // * whenever there is an error in background purge, flush or compaction - // * whenever num_running_addfile_ goes to 0. + // * whenever num_running_ingest_file_ goes to 0. InstrumentedCondVar bg_cv_; uint64_t logfile_number_; std::deque @@ -994,9 +985,9 @@ class DBImpl : public DB { // The options to access storage files const EnvOptions env_options_; - // Number of running AddFile() calls. + // Number of running IngestExternalFile() calls. // REQUIRES: mutex held - int num_running_addfile_; + int num_running_ingest_file_; #ifndef ROCKSDB_LITE WalManager wal_manager_; diff --git a/db/db_impl_add_file.cc b/db/db_impl_add_file.cc deleted file mode 100644 index c38215a76..000000000 --- a/db/db_impl_add_file.cc +++ /dev/null @@ -1,430 +0,0 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#include "db/db_impl.h" - -#ifndef __STDC_FORMAT_MACROS -#define __STDC_FORMAT_MACROS -#endif - -#include - -#include "rocksdb/db.h" -#include "rocksdb/env.h" -#include "rocksdb/sst_file_writer.h" - -#include "db/builder.h" -#include "table/sst_file_writer_collectors.h" -#include "table/table_builder.h" -#include "util/file_reader_writer.h" -#include "util/file_util.h" -#include "util/sync_point.h" - -namespace rocksdb { - -#ifndef ROCKSDB_LITE - -Status DBImpl::ReadExternalSstFileInfo(ColumnFamilyHandle* column_family, - const std::string& file_path, - ExternalSstFileInfo* file_info) { - Status status; - auto cfh = reinterpret_cast(column_family); - auto cfd = cfh->cfd(); - - file_info->file_path = file_path; - status = env_->GetFileSize(file_path, &file_info->file_size); - if (!status.ok()) { - return status; - } - - // Access the file using TableReader to extract - // version, number of entries, smallest user key, largest user key - std::unique_ptr sst_file; - status = env_->NewRandomAccessFile(file_path, &sst_file, env_options_); - if (!status.ok()) { - return status; - } - std::unique_ptr sst_file_reader; - sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file))); - - std::unique_ptr table_reader; - status = cfd->ioptions()->table_factory->NewTableReader( - TableReaderOptions(*cfd->ioptions(), env_options_, - cfd->internal_comparator()), - std::move(sst_file_reader), file_info->file_size, &table_reader); - if (!status.ok()) { - return status; - } - - // Get the external sst file version from table properties - const UserCollectedProperties& user_collected_properties = - table_reader->GetTableProperties()->user_collected_properties; - UserCollectedProperties::const_iterator external_sst_file_version_iter = - user_collected_properties.find(ExternalSstFilePropertyNames::kVersion); - if (external_sst_file_version_iter == user_collected_properties.end()) { - return Status::InvalidArgument("Generated table version not found"); - } - - file_info->version = - DecodeFixed32(external_sst_file_version_iter->second.c_str()); - if (file_info->version == 2) { - // version 2 imply that we have global sequence number - - // TODO(tec): Implement version 2 ingestion - file_info->sequence_number = 0; - } else if (file_info->version == 1) { - // version 1 imply that all sequence numbers in table equal 0 - file_info->sequence_number = 0; - } else { - return Status::InvalidArgument("Generated table version is not supported"); - } - // Get number of entries in table - file_info->num_entries = table_reader->GetTableProperties()->num_entries; - - ParsedInternalKey key; - std::unique_ptr iter( - table_reader->NewIterator(ReadOptions())); - - // Get first (smallest) key from file - iter->SeekToFirst(); - if (!ParseInternalKey(iter->key(), &key)) { - return Status::Corruption("Generated table have corrupted keys"); - } - if (key.sequence != 0) { - return Status::Corruption("Generated table have non zero sequence number"); - } - file_info->smallest_key = key.user_key.ToString(); - - // Get last (largest) key from file - iter->SeekToLast(); - if (!ParseInternalKey(iter->key(), &key)) { - return Status::Corruption("Generated table have corrupted keys"); - } - if (key.sequence != 0) { - return Status::Corruption("Generated table have non zero sequence number"); - } - file_info->largest_key = key.user_key.ToString(); - - return Status::OK(); -} - -Status DBImpl::AddFile(ColumnFamilyHandle* column_family, - const std::vector& file_path_list, - bool move_file, bool skip_snapshot_check) { - Status status; - auto num_files = file_path_list.size(); - if (num_files == 0) { - return Status::InvalidArgument("The list of files is empty"); - } - - std::vector file_info_list(num_files); - for (size_t i = 0; i < num_files; i++) { - status = ReadExternalSstFileInfo(column_family, file_path_list[i], - &file_info_list[i]); - if (!status.ok()) { - return status; - } - } - return AddFile(column_family, file_info_list, move_file, skip_snapshot_check); -} - -Status DBImpl::AddFile(ColumnFamilyHandle* column_family, - const std::vector& file_info_list, - bool move_file, bool skip_snapshot_check) { - Status status; - auto cfh = reinterpret_cast(column_family); - ColumnFamilyData* cfd = cfh->cfd(); - const Comparator* user_cmp = cfd->internal_comparator().user_comparator(); - - auto num_files = file_info_list.size(); - if (num_files == 0) { - return Status::InvalidArgument("The list of files is empty"); - } - - // Verify that passed files dont have overlapping ranges - if (num_files > 1) { - std::vector sorted_file_info_list(num_files); - for (size_t i = 0; i < num_files; i++) { - sorted_file_info_list[i] = &file_info_list[i]; - } - - std::sort(sorted_file_info_list.begin(), sorted_file_info_list.end(), - [&user_cmp, &file_info_list](const ExternalSstFileInfo* info1, - const ExternalSstFileInfo* info2) { - return user_cmp->Compare(info1->smallest_key, - info2->smallest_key) < 0; - }); - - for (size_t i = 0; i < num_files - 1; i++) { - if (user_cmp->Compare(sorted_file_info_list[i]->largest_key, - sorted_file_info_list[i + 1]->smallest_key) >= 0) { - return Status::NotSupported("Files have overlapping ranges"); - } - } - } - - std::vector micro_list(num_files, 0); - std::vector meta_list(num_files); - for (size_t i = 0; i < num_files; i++) { - StopWatch sw(env_, nullptr, 0, µ_list[i], false); - if (file_info_list[i].num_entries == 0) { - return Status::InvalidArgument("File contain no entries"); - } - - if (file_info_list[i].version == 2) { - // version 2 imply that file have only Put Operations - // with global Sequence Number - - // TODO(tec): Implement changing file global sequence number - } else if (file_info_list[i].version == 1) { - // version 1 imply that file have only Put Operations - // with Sequence Number = 0 - } else { - // Unknown version ! - return Status::InvalidArgument( - "Generated table version is not supported"); - } - - meta_list[i].smallest = - InternalKey(file_info_list[i].smallest_key, - file_info_list[i].sequence_number, ValueType::kTypeValue); - meta_list[i].largest = - InternalKey(file_info_list[i].largest_key, - file_info_list[i].sequence_number, ValueType::kTypeValue); - if (!meta_list[i].smallest.Valid() || !meta_list[i].largest.Valid()) { - return Status::Corruption("Generated table have corrupted keys"); - } - meta_list[i].smallest_seqno = file_info_list[i].sequence_number; - meta_list[i].largest_seqno = file_info_list[i].sequence_number; - if (meta_list[i].smallest_seqno != 0 || meta_list[i].largest_seqno != 0) { - return Status::InvalidArgument( - "Non zero sequence numbers are not supported"); - } - } - - std::vector::iterator> pending_outputs_inserted_elem_list( - num_files); - // Generate locations for the new tables - { - InstrumentedMutexLock l(&mutex_); - for (size_t i = 0; i < num_files; i++) { - StopWatch sw(env_, nullptr, 0, µ_list[i], false); - pending_outputs_inserted_elem_list[i] = - CaptureCurrentFileNumberInPendingOutputs(); - meta_list[i].fd = FileDescriptor(versions_->NewFileNumber(), 0, - file_info_list[i].file_size); - } - } - - // Copy/Move external files into DB - std::vector db_fname_list(num_files); - size_t j = 0; - for (; j < num_files; j++) { - StopWatch sw(env_, nullptr, 0, µ_list[j], false); - db_fname_list[j] = - TableFileName(immutable_db_options_.db_paths, - meta_list[j].fd.GetNumber(), meta_list[j].fd.GetPathId()); - if (move_file) { - status = env_->LinkFile(file_info_list[j].file_path, db_fname_list[j]); - if (status.IsNotSupported()) { - // Original file is on a different FS, use copy instead of hard linking - status = - CopyFile(env_, file_info_list[j].file_path, db_fname_list[j], 0); - } - } else { - status = CopyFile(env_, file_info_list[j].file_path, db_fname_list[j], 0); - } - TEST_SYNC_POINT("DBImpl::AddFile:FileCopied"); - if (!status.ok()) { - for (size_t i = 0; i < j; i++) { - Status s = env_->DeleteFile(db_fname_list[i]); - if (!s.ok()) { - Log(InfoLogLevel::WARN_LEVEL, immutable_db_options_.info_log, - "AddFile() clean up for file %s failed : %s", - db_fname_list[i].c_str(), s.ToString().c_str()); - } - } - return status; - } - } - - { - InstrumentedMutexLock l(&mutex_); - TEST_SYNC_POINT("DBImpl::AddFile:MutexLock"); - - const MutableCFOptions mutable_cf_options = - *cfd->GetLatestMutableCFOptions(); - - WriteThread::Writer w; - write_thread_.EnterUnbatched(&w, &mutex_); - - num_running_addfile_++; - - if (!skip_snapshot_check && !snapshots_.empty()) { - // Check that no snapshots are being held - status = - Status::NotSupported("Cannot add a file while holding snapshots"); - } - - if (status.ok()) { - // Verify that added file key range dont overlap with any keys in DB - SuperVersion* sv = cfd->GetSuperVersion()->Ref(); - Arena arena; - ReadOptions ro; - ro.total_order_seek = true; - ScopedArenaIterator iter(NewInternalIterator(ro, cfd, sv, &arena)); - - for (size_t i = 0; i < num_files; i++) { - StopWatch sw(env_, nullptr, 0, µ_list[i], false); - InternalKey range_start(file_info_list[i].smallest_key, - kMaxSequenceNumber, kValueTypeForSeek); - iter->Seek(range_start.Encode()); - status = iter->status(); - - if (status.ok() && iter->Valid()) { - ParsedInternalKey seek_result; - if (ParseInternalKey(iter->key(), &seek_result)) { - if (user_cmp->Compare(seek_result.user_key, - file_info_list[i].largest_key) <= 0) { - status = Status::NotSupported("Cannot add overlapping range"); - break; - } - } else { - status = Status::Corruption("DB have corrupted keys"); - break; - } - } - } - } - - // The levels that the files will be ingested into - std::vector target_level_list(num_files, 0); - if (status.ok()) { - VersionEdit edit; - edit.SetColumnFamily(cfd->GetID()); - for (size_t i = 0; i < num_files; i++) { - StopWatch sw(env_, nullptr, 0, µ_list[i], false); - // Add file to the lowest possible level - target_level_list[i] = PickLevelForIngestedFile(cfd, file_info_list[i]); - edit.AddFile(target_level_list[i], meta_list[i].fd.GetNumber(), - meta_list[i].fd.GetPathId(), meta_list[i].fd.GetFileSize(), - meta_list[i].smallest, meta_list[i].largest, - meta_list[i].smallest_seqno, meta_list[i].largest_seqno, - meta_list[i].marked_for_compaction); - } - status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, - directories_.GetDbDir()); - } - write_thread_.ExitUnbatched(&w); - - if (status.ok()) { - delete InstallSuperVersionAndScheduleWork(cfd, nullptr, - mutable_cf_options); - - // Update internal stats for new ingested files - uint64_t total_keys = 0; - uint64_t total_l0_files = 0; - for (size_t i = 0; i < num_files; i++) { - InternalStats::CompactionStats stats(1); - stats.micros = micro_list[i]; - stats.bytes_written = meta_list[i].fd.GetFileSize(); - stats.num_output_files = 1; - cfd->internal_stats()->AddCompactionStats(target_level_list[i], stats); - cfd->internal_stats()->AddCFStats( - InternalStats::BYTES_INGESTED_ADD_FILE, - meta_list[i].fd.GetFileSize()); - total_keys += file_info_list[i].num_entries; - if (target_level_list[i] == 0) { - total_l0_files += 1; - } - Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log, - "[AddFile] External SST file %s was ingested in L%d with path %s\n", - file_info_list[i].file_path.c_str(), target_level_list[i], - db_fname_list[i].c_str()); - } - cfd->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_KEYS_TOTAL, - total_keys); - cfd->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_FILES_TOTAL, - num_files); - cfd->internal_stats()->AddCFStats( - InternalStats::INGESTED_LEVEL0_NUM_FILES_TOTAL, total_l0_files); - } - - for (size_t i = 0; i < num_files; i++) { - ReleaseFileNumberFromPendingOutputs( - pending_outputs_inserted_elem_list[i]); - } - - num_running_addfile_--; - if (num_running_addfile_ == 0) { - bg_cv_.SignalAll(); - } - TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock"); - } // mutex_ is unlocked here; - - if (!status.ok()) { - // We failed to add the files to the database - for (size_t i = 0; i < num_files; i++) { - Status s = env_->DeleteFile(db_fname_list[i]); - if (!s.ok()) { - Log(InfoLogLevel::WARN_LEVEL, immutable_db_options_.info_log, - "AddFile() clean up for file %s failed : %s", - db_fname_list[i].c_str(), s.ToString().c_str()); - } - } - } else if (status.ok() && move_file) { - // The files were moved and added successfully, remove original file links - for (size_t i = 0; i < num_files; i++) { - Status s = env_->DeleteFile(file_info_list[i].file_path); - if (!s.ok()) { - Log(InfoLogLevel::WARN_LEVEL, immutable_db_options_.info_log, - "%s was added to DB successfully but failed to remove original " - "file " - "link : %s", - file_info_list[i].file_path.c_str(), s.ToString().c_str()); - } - } - } - return status; -} - -// Finds the lowest level in the DB that the ingested file can be added to -int DBImpl::PickLevelForIngestedFile(ColumnFamilyData* cfd, - const ExternalSstFileInfo& file_info) { - mutex_.AssertHeld(); - - int target_level = 0; - auto* vstorage = cfd->current()->storage_info(); - Slice file_smallest_user_key(file_info.smallest_key); - Slice file_largest_user_key(file_info.largest_key); - - for (int lvl = cfd->NumberLevels() - 1; lvl >= vstorage->base_level(); - lvl--) { - // Make sure that the file fits in Level `lvl` and dont overlap with - // the output of any compaction running right now. - if (vstorage->OverlapInLevel(lvl, &file_smallest_user_key, - &file_largest_user_key) == false && - cfd->RangeOverlapWithCompaction(file_smallest_user_key, - file_largest_user_key, lvl) == false) { - // Level lvl is the lowest level that dont have any files with key - // range overlapping with our file key range and no compactions - // planning to add overlapping files in it. - target_level = lvl; - break; - } - } - - return target_level; -} - -void DBImpl::WaitForAddFile() { - mutex_.AssertHeld(); - while (num_running_addfile_ > 0) { - bg_cv_.Wait(); - } -} -#endif // ROCKSDB_LITE - -} // namespace rocksdb diff --git a/db/db_test.cc b/db/db_test.cc index 04ec66bb8..a65334de7 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2646,15 +2646,11 @@ class ModelDB : public DB { } #ifndef ROCKSDB_LITE - using DB::AddFile; - virtual Status AddFile(ColumnFamilyHandle* column_family, - const std::vector& file_info_list, - bool move_file, bool skip_snapshot_check) override { - return Status::NotSupported("Not implemented."); - } - virtual Status AddFile(ColumnFamilyHandle* column_family, - const std::vector& file_path_list, - bool move_file, bool skip_snapshot_check) override { + using DB::IngestExternalFile; + virtual Status IngestExternalFile( + ColumnFamilyHandle* column_family, + const std::vector& external_files, + const IngestExternalFileOptions& options) override { return Status::NotSupported("Not implemented."); } diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 2be5a1b47..16147b603 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -1077,7 +1077,7 @@ std::vector DBTestBase::ListTableFiles(Env* env, } void DBTestBase::VerifyDBFromMap(std::map true_data, - size_t* total_reads_res) { + size_t* total_reads_res, bool tailing_iter) { size_t total_reads = 0; for (auto& kv : true_data) { @@ -1126,36 +1126,38 @@ void DBTestBase::VerifyDBFromMap(std::map true_data, delete iter; } + if (tailing_iter) { #ifndef ROCKSDB_LITE - // Tailing iterator - int iter_cnt = 0; - ReadOptions ro; - ro.tailing = true; - ro.total_order_seek = true; - Iterator* iter = db_->NewIterator(ro); - - // Verify ForwardIterator::Next() - iter_cnt = 0; - auto data_iter = true_data.begin(); - for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) { - ASSERT_EQ(iter->key().ToString(), data_iter->first); - ASSERT_EQ(iter->value().ToString(), data_iter->second); - iter_cnt++; - total_reads++; - } - ASSERT_EQ(data_iter, true_data.end()) << iter_cnt << " / " - << true_data.size(); - - // Verify ForwardIterator::Seek() - for (auto kv : true_data) { - iter->Seek(kv.first); - ASSERT_EQ(kv.first, iter->key().ToString()); - ASSERT_EQ(kv.second, iter->value().ToString()); - total_reads++; - } + // Tailing iterator + int iter_cnt = 0; + ReadOptions ro; + ro.tailing = true; + ro.total_order_seek = true; + Iterator* iter = db_->NewIterator(ro); - delete iter; + // Verify ForwardIterator::Next() + iter_cnt = 0; + auto data_iter = true_data.begin(); + for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) { + ASSERT_EQ(iter->key().ToString(), data_iter->first); + ASSERT_EQ(iter->value().ToString(), data_iter->second); + iter_cnt++; + total_reads++; + } + ASSERT_EQ(data_iter, true_data.end()) << iter_cnt << " / " + << true_data.size(); + + // Verify ForwardIterator::Seek() + for (auto kv : true_data) { + iter->Seek(kv.first); + ASSERT_EQ(kv.first, iter->key().ToString()); + ASSERT_EQ(kv.second, iter->value().ToString()); + total_reads++; + } + + delete iter; #endif // ROCKSDB_LITE + } if (total_reads_res) { *total_reads_res = total_reads; diff --git a/db/db_test_util.h b/db/db_test_util.h index 6801f4bbe..8166c2873 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -822,7 +822,8 @@ class DBTestBase : public testing::Test { std::vector ListTableFiles(Env* env, const std::string& path); void VerifyDBFromMap(std::map true_data, - size_t* total_reads_res = nullptr); + size_t* total_reads_res = nullptr, + bool tailing_iter = false); #ifndef ROCKSDB_LITE uint64_t GetNumberOfSstFilesForColumnFamily(DB* db, diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc new file mode 100644 index 000000000..a2dbad026 --- /dev/null +++ b/db/external_sst_file_ingestion_job.cc @@ -0,0 +1,491 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "db/external_sst_file_ingestion_job.h" + +#define __STDC_FORMAT_MACROS + +#include +#include +#include +#include + +#include "db/version_edit.h" +#include "table/merger.h" +#include "table/scoped_arena_iterator.h" +#include "table/sst_file_writer_collectors.h" +#include "table/table_builder.h" +#include "util/file_reader_writer.h" +#include "util/file_util.h" +#include "util/stop_watch.h" +#include "util/sync_point.h" + +namespace rocksdb { + +Status ExternalSstFileIngestionJob::Prepare( + const std::vector& external_files_paths) { + Status status; + + // Read the information of files we are ingesting + for (const std::string& file_path : external_files_paths) { + IngestedFileInfo file_to_ingest; + status = GetIngestedFileInfo(file_path, &file_to_ingest); + if (!status.ok()) { + return status; + } + files_to_ingest_.push_back(file_to_ingest); + } + + const Comparator* ucmp = cfd_->internal_comparator().user_comparator(); + auto num_files = files_to_ingest_.size(); + if (num_files == 0) { + return Status::InvalidArgument("The list of files is empty"); + } else if (num_files > 1) { + // Verify that passed files dont have overlapping ranges + autovector sorted_files; + for (size_t i = 0; i < num_files; i++) { + sorted_files.push_back(&files_to_ingest_[i]); + } + + std::sort( + sorted_files.begin(), sorted_files.end(), + [&ucmp](const IngestedFileInfo* info1, const IngestedFileInfo* info2) { + return ucmp->Compare(info1->smallest_user_key, + info2->smallest_user_key) < 0; + }); + + for (size_t i = 0; i < num_files - 1; i++) { + if (ucmp->Compare(sorted_files[i]->largest_user_key, + sorted_files[i + 1]->smallest_user_key) >= 0) { + return Status::NotSupported("Files have overlapping ranges"); + } + } + } + + for (IngestedFileInfo& f : files_to_ingest_) { + if (f.num_entries == 0) { + return Status::InvalidArgument("File contain no entries"); + } + + if (!f.smallest_internal_key().Valid() || + !f.largest_internal_key().Valid()) { + return Status::Corruption("Generated table have corrupted keys"); + } + } + + // Copy/Move external files into DB + for (IngestedFileInfo& f : files_to_ingest_) { + f.fd = FileDescriptor(versions_->NewFileNumber(), 0, f.file_size); + + const std::string path_outside_db = f.external_file_path; + const std::string path_inside_db = + TableFileName(db_options_.db_paths, f.fd.GetNumber(), f.fd.GetPathId()); + + if (ingestion_options_.move_files) { + status = env_->LinkFile(path_outside_db, path_inside_db); + if (status.IsNotSupported()) { + // Original file is on a different FS, use copy instead of hard linking + status = CopyFile(env_, path_outside_db, path_inside_db, 0); + } + } else { + status = CopyFile(env_, path_outside_db, path_inside_db, 0); + } + TEST_SYNC_POINT("DBImpl::AddFile:FileCopied"); + if (!status.ok()) { + break; + } + f.internal_file_path = path_inside_db; + } + + if (!status.ok()) { + // We failed, remove all files that we copied into the db + for (IngestedFileInfo& f : files_to_ingest_) { + if (f.internal_file_path == "") { + break; + } + Status s = env_->DeleteFile(f.internal_file_path); + if (!s.ok()) { + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "AddFile() clean up for file %s failed : %s", + f.internal_file_path.c_str(), s.ToString().c_str()); + } + } + } + + return status; +} + +Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed) { + SuperVersion* super_version = cfd_->GetSuperVersion(); + Status status = + IngestedFilesOverlapWithMemtables(super_version, flush_needed); + + if (status.ok() && *flush_needed && + !ingestion_options_.allow_blocking_flush) { + status = Status::InvalidArgument("External file requires flush"); + } + return status; +} + +Status ExternalSstFileIngestionJob::Run() { + Status status; +#ifndef NDEBUG + // We should never run the job with a memtable that is overlapping + // with the files we are ingesting + bool need_flush = false; + status = NeedsFlush(&need_flush); + assert(status.ok() && need_flush == false); +#endif + + bool consumed_seqno = false; + bool force_global_seqno = false; + const SequenceNumber last_seqno = versions_->LastSequence(); + if (ingestion_options_.snapshot_consistency && !db_snapshots_->empty()) { + // We need to assign a global sequence number to all the files even + // if the dont overlap with any ranges since we have snapshots + force_global_seqno = true; + } + + SuperVersion* super_version = cfd_->GetSuperVersion(); + edit_.SetColumnFamily(cfd_->GetID()); + // The levels that the files will be ingested into + for (IngestedFileInfo& f : files_to_ingest_) { + bool overlap_with_db = false; + status = AssignLevelForIngestedFile(super_version, &f, &overlap_with_db); + if (!status.ok()) { + return status; + } + + if (overlap_with_db || force_global_seqno) { + status = AssignGlobalSeqnoForIngestedFile(&f, last_seqno + 1); + consumed_seqno = true; + } else { + status = AssignGlobalSeqnoForIngestedFile(&f, 0); + } + + if (!status.ok()) { + return status; + } + + edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(), + f.fd.GetFileSize(), f.smallest_internal_key(), + f.largest_internal_key(), f.assigned_seqno, f.assigned_seqno, + false); + } + + if (consumed_seqno) { + versions_->SetLastSequence(last_seqno + 1); + } + + return status; +} + +void ExternalSstFileIngestionJob::UpdateStats() { + // Update internal stats for new ingested files + uint64_t total_keys = 0; + uint64_t total_l0_files = 0; + uint64_t total_time = env_->NowMicros() - job_start_time_; + for (IngestedFileInfo& f : files_to_ingest_) { + InternalStats::CompactionStats stats(1); + stats.micros = total_time; + stats.bytes_written = f.fd.GetFileSize(); + stats.num_output_files = 1; + cfd_->internal_stats()->AddCompactionStats(f.picked_level, stats); + cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_INGESTED_ADD_FILE, + f.fd.GetFileSize()); + total_keys += f.num_entries; + if (f.picked_level == 0) { + total_l0_files += 1; + } + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "[AddFile] External SST file %s was ingested in L%d with path %s " + "(global_seqno=%" PRIu64 ")\n", + f.external_file_path.c_str(), f.picked_level, + f.internal_file_path.c_str(), f.assigned_seqno); + } + cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_KEYS_TOTAL, + total_keys); + cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_FILES_TOTAL, + files_to_ingest_.size()); + cfd_->internal_stats()->AddCFStats( + InternalStats::INGESTED_LEVEL0_NUM_FILES_TOTAL, total_l0_files); +} + +void ExternalSstFileIngestionJob::Cleanup(const Status& status) { + if (!status.ok()) { + // We failed to add the files to the database + // remove all the files we copied + for (IngestedFileInfo& f : files_to_ingest_) { + Status s = env_->DeleteFile(f.internal_file_path); + if (!s.ok()) { + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "AddFile() clean up for file %s failed : %s", + f.internal_file_path.c_str(), s.ToString().c_str()); + } + } + } else if (status.ok() && ingestion_options_.move_files) { + // The files were moved and added successfully, remove original file links + for (IngestedFileInfo& f : files_to_ingest_) { + Status s = env_->DeleteFile(f.external_file_path); + if (!s.ok()) { + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "%s was added to DB successfully but failed to remove original " + "file link : %s", + f.external_file_path.c_str(), s.ToString().c_str()); + } + } + } +} + +Status ExternalSstFileIngestionJob::GetIngestedFileInfo( + const std::string& external_file, IngestedFileInfo* file_to_ingest) { + file_to_ingest->external_file_path = external_file; + + // Get external file size + Status status = env_->GetFileSize(external_file, &file_to_ingest->file_size); + if (!status.ok()) { + return status; + } + + // Create TableReader for external file + std::unique_ptr table_reader; + std::unique_ptr sst_file; + std::unique_ptr sst_file_reader; + + status = env_->NewRandomAccessFile(external_file, &sst_file, env_options_); + if (!status.ok()) { + return status; + } + sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file))); + + status = cfd_->ioptions()->table_factory->NewTableReader( + TableReaderOptions(*cfd_->ioptions(), env_options_, + cfd_->internal_comparator()), + std::move(sst_file_reader), file_to_ingest->file_size, &table_reader); + if (!status.ok()) { + return status; + } + + // Get the external file properties + auto props = table_reader->GetTableProperties(); + const auto& uprops = props->user_collected_properties; + + // Get table version + auto version_iter = uprops.find(ExternalSstFilePropertyNames::kVersion); + if (version_iter == uprops.end()) { + return Status::Corruption("External file version not found"); + } + file_to_ingest->version = DecodeFixed32(version_iter->second.c_str()); + + auto seqno_iter = uprops.find(ExternalSstFilePropertyNames::kGlobalSeqno); + if (file_to_ingest->version == 2) { + // version 2 imply that we have global sequence number + if (seqno_iter == uprops.end()) { + return Status::Corruption( + "External file global sequence number not found"); + } + + // Set the global sequence number + file_to_ingest->original_seqno = DecodeFixed64(seqno_iter->second.c_str()); + file_to_ingest->global_seqno_offset = props->properties_offsets.at( + ExternalSstFilePropertyNames::kGlobalSeqno); + + if (file_to_ingest->global_seqno_offset == 0) { + return Status::Corruption("Was not able to find file global seqno field"); + } + } else { + return Status::InvalidArgument("external file version is not supported"); + } + // Get number of entries in table + file_to_ingest->num_entries = props->num_entries; + + ParsedInternalKey key; + std::unique_ptr iter( + table_reader->NewIterator(ReadOptions())); + + // Get first (smallest) key from file + iter->SeekToFirst(); + if (!ParseInternalKey(iter->key(), &key)) { + return Status::Corruption("external file have corrupted keys"); + } + if (key.sequence != 0) { + return Status::Corruption("external file have non zero sequence number"); + } + file_to_ingest->smallest_user_key = key.user_key.ToString(); + + // Get last (largest) key from file + iter->SeekToLast(); + if (!ParseInternalKey(iter->key(), &key)) { + return Status::Corruption("external file have corrupted keys"); + } + if (key.sequence != 0) { + return Status::Corruption("external file have non zero sequence number"); + } + file_to_ingest->largest_user_key = key.user_key.ToString(); + + return status; +} + +Status ExternalSstFileIngestionJob::IngestedFilesOverlapWithMemtables( + SuperVersion* sv, bool* overlap) { + // Create an InternalIterator over all memtables + Arena arena; + ReadOptions ro; + ro.total_order_seek = true; + MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(), &arena); + merge_iter_builder.AddIterator(sv->mem->NewIterator(ro, &arena)); + sv->imm->AddIterators(ro, &merge_iter_builder); + ScopedArenaIterator memtable_iter(merge_iter_builder.Finish()); + + Status status; + *overlap = false; + for (IngestedFileInfo& f : files_to_ingest_) { + status = + IngestedFileOverlapWithIteratorRange(&f, memtable_iter.get(), overlap); + if (!status.ok() || *overlap == true) { + break; + } + } + + return status; +} + +Status ExternalSstFileIngestionJob::AssignLevelForIngestedFile( + SuperVersion* sv, IngestedFileInfo* file_to_ingest, bool* overlap_with_db) { + *overlap_with_db = false; + + Arena arena; + ReadOptions ro; + ro.total_order_seek = true; + + Status status; + int target_level = 0; + auto* vstorage = cfd_->current()->storage_info(); + for (int lvl = 0; lvl < cfd_->NumberLevels(); lvl++) { + if (lvl > 0 && lvl < vstorage->base_level()) { + continue; + } + + if (vstorage->NumLevelFiles(lvl) > 0) { + bool overlap_with_level = false; + MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(), + &arena); + sv->current->AddIteratorsForLevel(ro, env_options_, &merge_iter_builder, + lvl); + ScopedArenaIterator level_iter(merge_iter_builder.Finish()); + + status = IngestedFileOverlapWithIteratorRange( + file_to_ingest, level_iter.get(), &overlap_with_level); + if (!status.ok()) { + return status; + } + + if (overlap_with_level) { + // We must use L0 or any level higher than `lvl` to be able to overwrite + // the keys that we overlap with in this level, We also need to assign + // this file a seqno to overwrite the existing keys in level `lvl` + *overlap_with_db = true; + break; + } + } + + // We dont overlap with any keys in this level, but we still need to check + // if our file can fit in it + + if (IngestedFileFitInLevel(file_to_ingest, lvl)) { + target_level = lvl; + } + } + file_to_ingest->picked_level = target_level; + return status; +} + +Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile( + IngestedFileInfo* file_to_ingest, SequenceNumber seqno) { + if (file_to_ingest->original_seqno == seqno) { + // This file already have the correct global seqno + return Status::OK(); + } else if (!ingestion_options_.allow_global_seqno) { + return Status::InvalidArgument("Global seqno is required, but disabled"); + } else if (file_to_ingest->global_seqno_offset == 0) { + return Status::InvalidArgument( + "Trying to set global seqno for a file that dont have a global seqno " + "field"); + } + + std::unique_ptr rwfile; + Status status = env_->NewRandomRWFile(file_to_ingest->internal_file_path, + &rwfile, env_options_); + if (!status.ok()) { + return status; + } + + // Write the new seqno in the global sequence number field in the file + std::string seqno_val; + PutFixed64(&seqno_val, seqno); + status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val); + if (status.ok()) { + file_to_ingest->assigned_seqno = seqno; + } + return status; +} + +Status ExternalSstFileIngestionJob::IngestedFileOverlapWithIteratorRange( + const IngestedFileInfo* file_to_ingest, InternalIterator* iter, + bool* overlap) { + auto* vstorage = cfd_->current()->storage_info(); + auto* ucmp = vstorage->InternalComparator()->user_comparator(); + InternalKey range_start(file_to_ingest->smallest_user_key, kMaxSequenceNumber, + kValueTypeForSeek); + iter->Seek(range_start.Encode()); + if (!iter->status().ok()) { + return iter->status(); + } + + *overlap = false; + if (iter->Valid()) { + ParsedInternalKey seek_result; + if (!ParseInternalKey(iter->key(), &seek_result)) { + return Status::Corruption("DB have corrupted keys"); + } + + if (ucmp->Compare(seek_result.user_key, file_to_ingest->largest_user_key) <= + 0) { + *overlap = true; + } + } + + return iter->status(); +} + +bool ExternalSstFileIngestionJob::IngestedFileFitInLevel( + const IngestedFileInfo* file_to_ingest, int level) { + if (level == 0) { + // Files can always fit in L0 + return true; + } + + auto* vstorage = cfd_->current()->storage_info(); + Slice file_smallest_user_key(file_to_ingest->smallest_user_key); + Slice file_largest_user_key(file_to_ingest->largest_user_key); + + if (vstorage->OverlapInLevel(level, &file_smallest_user_key, + &file_largest_user_key)) { + // File overlap with another files in this level, we cannot + // add it to this level + return false; + } + if (cfd_->RangeOverlapWithCompaction(file_smallest_user_key, + file_largest_user_key, level)) { + // File overlap with a running compaction output that will be stored + // in this level, we cannot add this file to this level + return false; + } + + // File did not overlap with level files, our compaction output + return true; +} + +} // namespace rocksdb diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h new file mode 100644 index 000000000..8e65ae3b2 --- /dev/null +++ b/db/external_sst_file_ingestion_job.h @@ -0,0 +1,143 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#include +#include +#include + +#include "db/column_family.h" +#include "db/dbformat.h" +#include "db/internal_stats.h" +#include "db/snapshot_impl.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/sst_file_writer.h" +#include "util/autovector.h" +#include "util/db_options.h" + +namespace rocksdb { + +struct IngestedFileInfo { + // External file path + std::string external_file_path; + // Smallest user key in external file + std::string smallest_user_key; + // Largest user key in external file + std::string largest_user_key; + // Sequence number for keys in external file + SequenceNumber original_seqno; + // Offset of the global sequence number field in the file, will + // be zero if version is 1 (global seqno is not supported) + size_t global_seqno_offset; + // External file size + uint64_t file_size; + // total number of keys in external file + uint64_t num_entries; + // Version of external file + int version; + + // FileDescriptor for the file inside the DB + FileDescriptor fd; + // file path that we picked for file inside the DB + std::string internal_file_path = ""; + // Global sequence number that we picked for the file inside the DB + SequenceNumber assigned_seqno = 0; + // Level inside the DB we picked for the external file. + int picked_level = 0; + + InternalKey smallest_internal_key() const { + return InternalKey(smallest_user_key, assigned_seqno, + ValueType::kTypeValue); + } + + InternalKey largest_internal_key() const { + return InternalKey(largest_user_key, assigned_seqno, ValueType::kTypeValue); + } +}; + +class ExternalSstFileIngestionJob { + public: + ExternalSstFileIngestionJob( + Env* env, VersionSet* versions, ColumnFamilyData* cfd, + const ImmutableDBOptions& db_options, const EnvOptions& env_options, + SnapshotList* db_snapshots, + const IngestExternalFileOptions& ingestion_options) + : env_(env), + versions_(versions), + cfd_(cfd), + db_options_(db_options), + env_options_(env_options), + db_snapshots_(db_snapshots), + ingestion_options_(ingestion_options), + job_start_time_(env_->NowMicros()) {} + + // Prepare the job by copying external files into the DB. + Status Prepare(const std::vector& external_files_paths); + + // Check if we need to flush the memtable before running the ingestion job + // This will be true if the files we are ingesting are overlapping with any + // key range in the memtable. + // REQUIRES: Mutex held + Status NeedsFlush(bool* flush_needed); + + // Will execute the ingestion job and prepare edit() to be applied. + // REQUIRES: Mutex held + Status Run(); + + // Update column family stats. + // REQUIRES: Mutex held + void UpdateStats(); + + // Cleanup after successfull/failed job + void Cleanup(const Status& status); + + VersionEdit* edit() { return &edit_; } + + private: + // Open the external file and populate `file_to_ingest` with all the + // external information we need to ingest this file. + Status GetIngestedFileInfo(const std::string& external_file, + IngestedFileInfo* file_to_ingest); + + // Check if the files we are ingesting overlap with any memtable. + // REQUIRES: Mutex held + Status IngestedFilesOverlapWithMemtables(SuperVersion* sv, bool* overlap); + + // Assign `file_to_ingest` the lowest possible level that it can + // be ingested to. + // REQUIRES: Mutex held + Status AssignLevelForIngestedFile(SuperVersion* sv, + IngestedFileInfo* file_to_ingest, + bool* overlap_with_db); + + // Set the file global sequence number to `seqno` + Status AssignGlobalSeqnoForIngestedFile(IngestedFileInfo* file_to_ingest, + SequenceNumber seqno); + + // Check if `file_to_ingest` key range overlap with the range `iter` represent + // REQUIRES: Mutex held + Status IngestedFileOverlapWithIteratorRange( + const IngestedFileInfo* file_to_ingest, InternalIterator* iter, + bool* overlap); + + // Check if `file_to_ingest` can fit in level `level` + // REQUIRES: Mutex held + bool IngestedFileFitInLevel(const IngestedFileInfo* file_to_ingest, + int level); + + Env* env_; + VersionSet* versions_; + ColumnFamilyData* cfd_; + const ImmutableDBOptions& db_options_; + const EnvOptions& env_options_; + SnapshotList* db_snapshots_; + autovector files_to_ingest_; + const IngestExternalFileOptions& ingestion_options_; + VersionEdit edit_; + uint64_t job_start_time_; +}; + +} // namespace rocksdb diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index 87f370008..2762249c9 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -24,8 +24,32 @@ class ExternalSSTFileTest : public DBTestBase { env_->CreateDir(sst_files_dir_); } - Status GenerateAndAddExternalFile(const Options options, - std::vector keys, size_t file_id) { + Status GenerateAndAddExternalFile( + const Options options, + std::vector> data, int file_id = -1, + bool allow_global_seqno = false, bool sort_data = false, + std::map* true_data = nullptr) { + // Generate a file id if not provided + if (file_id == -1) { + file_id = last_file_id_ + 1; + last_file_id_++; + } + + // Sort data if asked to do so + if (sort_data) { + std::sort(data.begin(), data.end(), + [&](const std::pair& e1, + const std::pair& e2) { + return options.comparator->Compare(e1.first, e2.first) < 0; + }); + auto uniq_iter = std::unique( + data.begin(), data.end(), + [&](const std::pair& e1, + const std::pair& e2) { + return options.comparator->Compare(e1.first, e2.first) == 0; + }); + data.resize(uniq_iter - data.begin()); + } std::string file_path = sst_files_dir_ + ToString(file_id); SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator); @@ -33,33 +57,75 @@ class ExternalSSTFileTest : public DBTestBase { if (!s.ok()) { return s; } - for (auto& entry : keys) { - std::string k = Key(entry); - std::string v = k + ToString(file_id); - s = sst_file_writer.Add(k, v); + for (auto& entry : data) { + s = sst_file_writer.Add(entry.first, entry.second); if (!s.ok()) { + sst_file_writer.Finish(); return s; } } s = sst_file_writer.Finish(); if (s.ok()) { - s = db_->AddFile(std::vector(1, file_path)); + IngestExternalFileOptions ifo; + ifo.allow_global_seqno = allow_global_seqno; + s = db_->IngestExternalFile({file_path}, ifo); + } + + if (s.ok() && true_data) { + for (auto& entry : data) { + (*true_data)[entry.first] = entry.second; + } } return s; } + Status GenerateAndAddExternalFile( + const Options options, std::vector> data, + int file_id = -1, bool allow_global_seqno = false, bool sort_data = false, + std::map* true_data = nullptr) { + std::vector> file_data; + for (auto& entry : data) { + file_data.emplace_back(Key(entry.first), entry.second); + } + return GenerateAndAddExternalFile(options, file_data, file_id, + allow_global_seqno, sort_data, true_data); + } + + Status GenerateAndAddExternalFile( + const Options options, std::vector keys, int file_id = -1, + bool allow_global_seqno = false, bool sort_data = false, + std::map* true_data = nullptr) { + std::vector> file_data; + for (auto& k : keys) { + file_data.emplace_back(Key(k), Key(k) + ToString(file_id)); + } + return GenerateAndAddExternalFile(options, file_data, file_id, + allow_global_seqno, sort_data, true_data); + } + + Status DeprecatedAddFile(const std::vector& files, + bool move_files = false, + bool skip_snapshot_check = false) { + IngestExternalFileOptions opts; + opts.move_files = move_files; + opts.snapshot_consistency = !skip_snapshot_check; + opts.allow_global_seqno = false; + opts.allow_blocking_flush = false; + return db_->IngestExternalFile(files, opts); + } + ~ExternalSSTFileTest() { test::DestroyDir(env_, sst_files_dir_); } protected: + int last_file_id_ = 0; std::string sst_files_dir_; }; TEST_F(ExternalSSTFileTest, Basic) { do { Options options = CurrentOptions(); - options.env = env_; SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator); @@ -149,7 +215,7 @@ TEST_F(ExternalSSTFileTest, Basic) { DestroyAndReopen(options); // Add file using file path - s = db_->AddFile(std::vector(1, file1)); + s = DeprecatedAddFile({file1}); ASSERT_TRUE(s.ok()) << s.ToString(); ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U); for (int k = 0; k < 100; k++) { @@ -159,11 +225,11 @@ TEST_F(ExternalSSTFileTest, Basic) { // Add file while holding a snapshot will fail const Snapshot* s1 = db_->GetSnapshot(); if (s1 != nullptr) { - ASSERT_NOK(db_->AddFile(std::vector(1, file2_info))); + ASSERT_NOK(DeprecatedAddFile({file2})); db_->ReleaseSnapshot(s1); } // We can add the file after releaseing the snapshot - ASSERT_OK(db_->AddFile(std::vector(1, file2_info))); + ASSERT_OK(DeprecatedAddFile({file2})); ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U); for (int k = 0; k < 200; k++) { @@ -171,11 +237,11 @@ TEST_F(ExternalSSTFileTest, Basic) { } // This file has overlapping values with the exisitng data - s = db_->AddFile(std::vector(1, file3)); + s = DeprecatedAddFile({file3}); ASSERT_FALSE(s.ok()) << s.ToString(); // This file has overlapping values with the exisitng data - s = db_->AddFile(std::vector(1, file4_info)); + s = DeprecatedAddFile({file4}); ASSERT_FALSE(s.ok()) << s.ToString(); // Overwrite values of keys divisible by 5 @@ -185,7 +251,7 @@ TEST_F(ExternalSSTFileTest, Basic) { ASSERT_NE(db_->GetLatestSequenceNumber(), 0U); // Key range of file5 (400 => 499) dont overlap with any keys in DB - ASSERT_OK(db_->AddFile(std::vector(1, file5))); + ASSERT_OK(DeprecatedAddFile({file5})); // Make sure values are correct before and after flush/compaction for (int i = 0; i < 2; i++) { @@ -214,13 +280,13 @@ TEST_F(ExternalSSTFileTest, Basic) { } // We deleted range (400 => 499) but cannot add file5 because // of the range tombstones - ASSERT_NOK(db_->AddFile(std::vector(1, file5))); + ASSERT_NOK(DeprecatedAddFile({file5})); // Compacting the DB will remove the tombstones ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); // Now we can add the file - ASSERT_OK(db_->AddFile(std::vector(1, file5))); + ASSERT_OK(DeprecatedAddFile({file5})); // Verify values of file5 in DB for (int k = 400; k < 500; k++) { @@ -281,7 +347,6 @@ class SstFileWriterCollectorFactory : public TablePropertiesCollectorFactory { TEST_F(ExternalSSTFileTest, AddList) { do { Options options = CurrentOptions(); - options.env = env_; auto abc_collector = std::make_shared("abc"); auto xyz_collector = std::make_shared("xyz"); @@ -375,22 +440,14 @@ TEST_F(ExternalSSTFileTest, AddList) { std::vector file_list2({file5}); std::vector file_list3({file3, file4}); - std::vector info_list0({file1_info, file2_info}); - std::vector info_list1( - {file3_info, file2_info, file1_info}); - std::vector info_list2({file5_info}); - std::vector info_list3({file3_info, file4_info}); - DestroyAndReopen(options); // This list of files have key ranges are overlapping with each other - s = db_->AddFile(file_list1); - ASSERT_FALSE(s.ok()) << s.ToString(); - s = db_->AddFile(info_list1); + s = DeprecatedAddFile(file_list1); ASSERT_FALSE(s.ok()) << s.ToString(); // Add files using file path list - s = db_->AddFile(file_list0); + s = DeprecatedAddFile(file_list0); ASSERT_TRUE(s.ok()) << s.ToString(); ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U); for (int k = 0; k < 200; k++) { @@ -411,11 +468,11 @@ TEST_F(ExternalSSTFileTest, AddList) { // Add file while holding a snapshot will fail const Snapshot* s1 = db_->GetSnapshot(); if (s1 != nullptr) { - ASSERT_NOK(db_->AddFile(info_list2)); + ASSERT_NOK(DeprecatedAddFile(file_list2)); db_->ReleaseSnapshot(s1); } // We can add the file after releaseing the snapshot - ASSERT_OK(db_->AddFile(info_list2)); + ASSERT_OK(DeprecatedAddFile(file_list2)); ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U); for (int k = 0; k < 300; k++) { ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); @@ -432,9 +489,7 @@ TEST_F(ExternalSSTFileTest, AddList) { } // This file list has overlapping values with the exisitng data - s = db_->AddFile(file_list3); - ASSERT_FALSE(s.ok()) << s.ToString(); - s = db_->AddFile(info_list3); + s = DeprecatedAddFile(file_list3); ASSERT_FALSE(s.ok()) << s.ToString(); // Overwrite values of keys divisible by 5 @@ -466,13 +521,13 @@ TEST_F(ExternalSSTFileTest, AddList) { } // We deleted range (200 => 299) but cannot add file5 because // of the range tombstones - ASSERT_NOK(db_->AddFile(file_list2)); + ASSERT_NOK(DeprecatedAddFile(file_list2)); // Compacting the DB will remove the tombstones ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); // Now we can add the file - ASSERT_OK(db_->AddFile(file_list2)); + ASSERT_OK(DeprecatedAddFile(file_list2)); // Verify values of file5 in DB for (int k = 200; k < 300; k++) { @@ -487,7 +542,6 @@ TEST_F(ExternalSSTFileTest, AddList) { TEST_F(ExternalSSTFileTest, AddListAtomicity) { do { Options options = CurrentOptions(); - options.env = env_; SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator); @@ -512,13 +566,13 @@ TEST_F(ExternalSSTFileTest, AddListAtomicity) { ASSERT_EQ(files_info[i].largest_key, Key((i + 1) * 100 - 1)); } files.push_back(sst_files_dir_ + "file" + std::to_string(n) + ".sst"); - auto s = db_->AddFile(files); + auto s = DeprecatedAddFile(files); ASSERT_NOK(s) << s.ToString(); for (int k = 0; k < n * 100; k++) { ASSERT_EQ("NOT_FOUND", Get(Key(k))); } - s = db_->AddFile(files_info); - ASSERT_OK(s); + files.pop_back(); + ASSERT_OK(DeprecatedAddFile(files)); for (int k = 0; k < n * 100; k++) { std::string value = Key(k) + "_val"; ASSERT_EQ(Get(Key(k)), value); @@ -532,7 +586,6 @@ TEST_F(ExternalSSTFileTest, AddListAtomicity) { // This situation may result in deleting the file while it's being added. TEST_F(ExternalSSTFileTest, PurgeObsoleteFilesBug) { Options options = CurrentOptions(); - options.env = env_; SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator); // file1.sst (0 => 500) @@ -563,7 +616,7 @@ TEST_F(ExternalSSTFileTest, PurgeObsoleteFilesBug) { }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - s = db_->AddFile(std::vector(1, sst_file_path)); + s = DeprecatedAddFile({sst_file_path}); ASSERT_OK(s); for (int i = 0; i < 500; i++) { @@ -577,7 +630,6 @@ TEST_F(ExternalSSTFileTest, PurgeObsoleteFilesBug) { TEST_F(ExternalSSTFileTest, NoCopy) { Options options = CurrentOptions(); - options.env = env_; const ImmutableCFOptions ioptions(options); SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator); @@ -623,19 +675,16 @@ TEST_F(ExternalSSTFileTest, NoCopy) { ASSERT_EQ(file3_info.num_entries, 15); ASSERT_EQ(file3_info.smallest_key, Key(110)); ASSERT_EQ(file3_info.largest_key, Key(124)); - s = db_->AddFile(std::vector(1, file1_info), - true /* move file */); + s = DeprecatedAddFile({file1}, true /* move file */); ASSERT_TRUE(s.ok()) << s.ToString(); ASSERT_EQ(Status::NotFound(), env_->FileExists(file1)); - s = db_->AddFile(std::vector(1, file2_info), - false /* copy file */); + s = DeprecatedAddFile({file2}, false /* copy file */); ASSERT_TRUE(s.ok()) << s.ToString(); ASSERT_OK(env_->FileExists(file2)); // This file have overlapping values with the exisitng data - s = db_->AddFile(std::vector(1, file2_info), - true /* move file */); + s = DeprecatedAddFile({file2}, true /* move file */); ASSERT_FALSE(s.ok()) << s.ToString(); ASSERT_OK(env_->FileExists(file3)); @@ -646,7 +695,6 @@ TEST_F(ExternalSSTFileTest, NoCopy) { TEST_F(ExternalSSTFileTest, SkipSnapshot) { Options options = CurrentOptions(); - options.env = env_; SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator); @@ -678,20 +726,19 @@ TEST_F(ExternalSSTFileTest, SkipSnapshot) { ASSERT_EQ(file2_info.smallest_key, Key(100)); ASSERT_EQ(file2_info.largest_key, Key(299)); - ASSERT_OK(db_->AddFile(std::vector(1, file1_info))); + ASSERT_OK(DeprecatedAddFile({file1})); // Add file will fail when holding snapshot and use the default // skip_snapshot_check to false const Snapshot* s1 = db_->GetSnapshot(); if (s1 != nullptr) { - ASSERT_NOK(db_->AddFile(std::vector(1, file2_info))); + ASSERT_NOK(DeprecatedAddFile({file2})); } // Add file will success when set skip_snapshot_check to true even db holding // snapshot if (s1 != nullptr) { - ASSERT_OK(db_->AddFile(std::vector(1, file2_info), - false, true)); + ASSERT_OK(DeprecatedAddFile({file2}, false, true)); db_->ReleaseSnapshot(s1); } @@ -712,8 +759,7 @@ TEST_F(ExternalSSTFileTest, SkipSnapshot) { // check that we have change the old key ASSERT_EQ(Get(Key(300)), "NOT_FOUND"); const Snapshot* s2 = db_->GetSnapshot(); - ASSERT_OK(db_->AddFile(std::vector(1, file3_info), false, - true)); + ASSERT_OK(DeprecatedAddFile({file3}, false, true)); ASSERT_EQ(Get(Key(300)), Key(300) + ("_val")); ASSERT_EQ(Get(Key(300), s2), Key(300) + ("_val")); @@ -789,7 +835,7 @@ TEST_F(ExternalSSTFileTest, MultiThreaded) { files_to_add.push_back(file_names[file_idx + 1]); } - Status s = db_->AddFile(files_to_add, move_file); + Status s = DeprecatedAddFile(files_to_add, move_file); if (s.ok()) { files_added += static_cast(files_to_add.size()); } @@ -886,7 +932,7 @@ TEST_F(ExternalSSTFileTest, OverlappingRanges) { ASSERT_OK(s); // Insert the generated file - s = db_->AddFile(std::vector(1, file_info)); + s = DeprecatedAddFile({file_name}); auto it = true_data.lower_bound(Key(range_start)); if (it != true_data.end() && it->first <= Key(range_end)) { @@ -937,21 +983,18 @@ TEST_F(ExternalSSTFileTest, PickedLevel) { options.disable_auto_compactions = false; options.level0_file_num_compaction_trigger = 4; options.num_levels = 4; - options.env = env_; DestroyAndReopen(options); - std::vector> file_to_keys; + std::map true_data; // File 0 will go to last level (L3) - file_to_keys.push_back({1, 10}); - ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), - file_to_keys.size() - 1)); + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 10}, -1, false, false, + &true_data)); EXPECT_EQ(FilesPerLevel(), "0,0,0,1"); // File 1 will go to level L2 (since it overlap with file 0 in L3) - file_to_keys.push_back({2, 9}); - ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), - file_to_keys.size() - 1)); + ASSERT_OK(GenerateAndAddExternalFile(options, {2, 9}, -1, false, false, + &true_data)); EXPECT_EQ(FilesPerLevel(), "0,0,1,1"); rocksdb::SyncPoint::GetInstance()->LoadDependency({ @@ -967,6 +1010,8 @@ TEST_F(ExternalSSTFileTest, PickedLevel) { for (int i = 0; i < 4; i++) { ASSERT_OK(Put(Key(3), Key(3) + "put")); ASSERT_OK(Put(Key(8), Key(8) + "put")); + true_data[Key(3)] = Key(3) + "put"; + true_data[Key(8)] = Key(8) + "put"; ASSERT_OK(Flush()); } @@ -978,15 +1023,13 @@ TEST_F(ExternalSSTFileTest, PickedLevel) { // This file overlaps with file 0 (L3), file 1 (L2) and the // output of compaction going to L1 - file_to_keys.push_back({4, 7}); - ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), - file_to_keys.size() - 1)); + ASSERT_OK(GenerateAndAddExternalFile(options, {4, 7}, -1, false, false, + &true_data)); EXPECT_EQ(FilesPerLevel(), "5,0,1,1"); // This file does not overlap with any file or with the running compaction - file_to_keys.push_back({9000, 9001}); - ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), - file_to_keys.size() - 1)); + ASSERT_OK(GenerateAndAddExternalFile(options, {9000, 9001}, -1, false, false, + &true_data)); EXPECT_EQ(FilesPerLevel(), "5,0,1,2"); // Hold compaction from finishing @@ -995,17 +1038,8 @@ TEST_F(ExternalSSTFileTest, PickedLevel) { dbfull()->TEST_WaitForCompact(); EXPECT_EQ(FilesPerLevel(), "1,1,1,2"); - for (size_t file_id = 0; file_id < file_to_keys.size(); file_id++) { - for (auto& key_id : file_to_keys[file_id]) { - std::string k = Key(key_id); - std::string v = k + ToString(file_id); - if (key_id == 3 || key_id == 8) { - v = k + "put"; - } - - ASSERT_EQ(Get(k), v); - } - } + size_t kcnt = 0; + VerifyDBFromMap(true_data, &kcnt, false); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } @@ -1015,7 +1049,6 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) { options.disable_auto_compactions = false; options.level0_file_num_compaction_trigger = 3; options.num_levels = 2; - options.env = env_; DestroyAndReopen(options); std::vector file_keys; @@ -1102,7 +1135,6 @@ TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) { options.disable_auto_compactions = false; options.level0_file_num_compaction_trigger = 2; options.num_levels = 2; - options.env = env_; DestroyAndReopen(options); std::function bg_compact = [&]() { @@ -1160,8 +1192,8 @@ TEST_F(ExternalSSTFileTest, PickedLevelDynamic) { options.level0_file_num_compaction_trigger = 4; options.level_compaction_dynamic_level_bytes = true; options.num_levels = 4; - options.env = env_; DestroyAndReopen(options); + std::map true_data; rocksdb::SyncPoint::GetInstance()->LoadDependency({ {"ExternalSSTFileTest::PickedLevelDynamic:0", @@ -1177,9 +1209,11 @@ TEST_F(ExternalSSTFileTest, PickedLevelDynamic) { for (int i = 0; i < 4; i++) { for (int k = 20; k <= 30; k++) { ASSERT_OK(Put(Key(k), Key(k) + "put")); + true_data[Key(k)] = Key(k) + "put"; } for (int k = 50; k <= 60; k++) { ASSERT_OK(Put(Key(k), Key(k) + "put")); + true_data[Key(k)] = Key(k) + "put"; } ASSERT_OK(Flush()); } @@ -1187,19 +1221,16 @@ TEST_F(ExternalSSTFileTest, PickedLevelDynamic) { // Wait for BackgroundCompaction() to be called TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelDynamic:0"); TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelDynamic:1"); - std::vector> file_to_keys; // This file overlaps with the output of the compaction (going to L3) // so the file will be added to L0 since L3 is the base level - file_to_keys.push_back({31, 32, 33, 34}); - ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), - file_to_keys.size() - 1)); + ASSERT_OK(GenerateAndAddExternalFile(options, {31, 32, 33, 34}, -1, false, + false, &true_data)); EXPECT_EQ(FilesPerLevel(), "5"); // This file does not overlap with the current running compactiong - file_to_keys.push_back({9000, 9001}); - ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), - file_to_keys.size() - 1)); + ASSERT_OK(GenerateAndAddExternalFile(options, {9000, 9001}, -1, false, false, + &true_data)); EXPECT_EQ(FilesPerLevel(), "5,0,0,1"); // Hold compaction from finishing @@ -1213,76 +1244,54 @@ TEST_F(ExternalSSTFileTest, PickedLevelDynamic) { options.disable_auto_compactions = true; Reopen(options); - file_to_keys.push_back({1, 15, 19}); - ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), - file_to_keys.size() - 1)); + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 15, 19}, -1, false, false, + &true_data)); ASSERT_EQ(FilesPerLevel(), "1,0,0,3"); - file_to_keys.push_back({1000, 1001, 1002}); - ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), - file_to_keys.size() - 1)); + ASSERT_OK(GenerateAndAddExternalFile(options, {1000, 1001, 1002}, -1, false, + false, &true_data)); ASSERT_EQ(FilesPerLevel(), "1,0,0,4"); - file_to_keys.push_back({500, 600, 700}); - ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), - file_to_keys.size() - 1)); + ASSERT_OK(GenerateAndAddExternalFile(options, {500, 600, 700}, -1, false, + false, &true_data)); ASSERT_EQ(FilesPerLevel(), "1,0,0,5"); // File 5 overlaps with file 2 (L3 / base level) - file_to_keys.push_back({2, 10}); - ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), - file_to_keys.size() - 1)); + ASSERT_OK(GenerateAndAddExternalFile(options, {2, 10}, -1, false, false, + &true_data)); ASSERT_EQ(FilesPerLevel(), "2,0,0,5"); // File 6 overlaps with file 2 (L3 / base level) and file 5 (L0) - file_to_keys.push_back({3, 9}); - ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), - file_to_keys.size() - 1)); + ASSERT_OK(GenerateAndAddExternalFile(options, {3, 9}, -1, false, false, + &true_data)); ASSERT_EQ(FilesPerLevel(), "3,0,0,5"); // Verify data in files - for (size_t file_id = 0; file_id < file_to_keys.size(); file_id++) { - for (auto& key_id : file_to_keys[file_id]) { - std::string k = Key(key_id); - std::string v = k + ToString(file_id); - - ASSERT_EQ(Get(k), v); - } - } + size_t kcnt = 0; + VerifyDBFromMap(true_data, &kcnt, false); // Write range [5 => 10] to L0 for (int i = 5; i <= 10; i++) { std::string k = Key(i); std::string v = k + "put"; ASSERT_OK(Put(k, v)); + true_data[k] = v; } ASSERT_OK(Flush()); ASSERT_EQ(FilesPerLevel(), "4,0,0,5"); // File 7 overlaps with file 4 (L3) - file_to_keys.push_back({650, 651, 652}); - ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(), - file_to_keys.size() - 1)); + ASSERT_OK(GenerateAndAddExternalFile(options, {650, 651, 652}, -1, false, + false, &true_data)); ASSERT_EQ(FilesPerLevel(), "5,0,0,5"); - for (size_t file_id = 0; file_id < file_to_keys.size(); file_id++) { - for (auto& key_id : file_to_keys[file_id]) { - std::string k = Key(key_id); - std::string v = k + ToString(file_id); - if (key_id >= 5 && key_id <= 10) { - v = k + "put"; - } - - ASSERT_EQ(Get(k), v); - } - } + VerifyDBFromMap(true_data, &kcnt, false); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } TEST_F(ExternalSSTFileTest, AddExternalSstFileWithCustomCompartor) { Options options = CurrentOptions(); - options.env = env_; options.comparator = ReverseBytewiseComparator(); DestroyAndReopen(options); @@ -1314,19 +1323,19 @@ TEST_F(ExternalSSTFileTest, AddExternalSstFileWithCustomCompartor) { // These 2nd and 3rd files overlap with each other in_files = {generated_files[0], generated_files[4], generated_files[5], generated_files[7]}; - ASSERT_NOK(db_->AddFile(in_files)); + ASSERT_NOK(DeprecatedAddFile(in_files)); // These 2 files dont overlap with each other in_files = {generated_files[0], generated_files[2]}; - ASSERT_OK(db_->AddFile(in_files)); + ASSERT_OK(DeprecatedAddFile(in_files)); // These 2 files dont overlap with each other but overlap with keys in DB in_files = {generated_files[3], generated_files[7]}; - ASSERT_NOK(db_->AddFile(in_files)); + ASSERT_NOK(DeprecatedAddFile(in_files)); // Files dont overlap and dont overlap with DB key range in_files = {generated_files[4], generated_files[6], generated_files[8]}; - ASSERT_OK(db_->AddFile(in_files)); + ASSERT_OK(DeprecatedAddFile(in_files)); for (int i = 0; i < 100; i++) { if (i % 20 <= 14) { @@ -1400,9 +1409,282 @@ TEST_F(ExternalSSTFileTest, SstFileWriterNonSharedKeys) { ASSERT_OK(sst_file_writer.Add("ZAAAX" + suffix, "VAL")); ASSERT_OK(sst_file_writer.Finish()); - ASSERT_OK(db_->AddFile(std::vector(1, file_path))); + ASSERT_OK(DeprecatedAddFile({file_path})); +} + +TEST_F(ExternalSSTFileTest, IngestFileWithGlobalSeqnoRandomized) { + Options options = CurrentOptions(); + options.IncreaseParallelism(20); + options.level0_slowdown_writes_trigger = 256; + options.level0_stop_writes_trigger = 256; + + for (int iter = 0; iter < 2; iter++) { + bool write_to_memtable = (iter == 0); + DestroyAndReopen(options); + + Random rnd(301); + std::map true_data; + for (int i = 0; i < 2000; i++) { + std::vector> random_data; + for (int j = 0; j < 100; j++) { + std::string k; + std::string v; + test::RandomString(&rnd, rnd.Next() % 20, &k); + test::RandomString(&rnd, rnd.Next() % 50, &v); + random_data.emplace_back(k, v); + } + + if (write_to_memtable && rnd.OneIn(4)) { + // 25% of writes go through memtable + for (auto& entry : random_data) { + ASSERT_OK(Put(entry.first, entry.second)); + true_data[entry.first] = entry.second; + } + } else { + ASSERT_OK(GenerateAndAddExternalFile(options, random_data, -1, true, + true, &true_data)); + } + } + size_t kcnt = 0; + VerifyDBFromMap(true_data, &kcnt, false); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + VerifyDBFromMap(true_data, &kcnt, false); + } +} + +TEST_F(ExternalSSTFileTest, IngestFileWithGlobalSeqnoAssignedLevel) { + Options options = CurrentOptions(); + options.num_levels = 5; + options.disable_auto_compactions = true; + DestroyAndReopen(options); + std::vector> file_data; + std::map true_data; + + // Insert 100 -> 200 into the memtable + for (int i = 100; i <= 200; i++) { + ASSERT_OK(Put(Key(i), "memtable")); + true_data[Key(i)] = "memtable"; + } + + // Insert 0 -> 20 using AddFile + file_data.clear(); + for (int i = 0; i <= 20; i++) { + file_data.emplace_back(Key(i), "L4"); + } + ASSERT_OK(GenerateAndAddExternalFile(options, file_data, -1, true, false, + &true_data)); + + // This file dont overlap with anything in the DB, will go to L4 + ASSERT_EQ("0,0,0,0,1", FilesPerLevel()); + + // Insert 80 -> 130 using AddFile + file_data.clear(); + for (int i = 80; i <= 130; i++) { + file_data.emplace_back(Key(i), "L0"); + } + ASSERT_OK(GenerateAndAddExternalFile(options, file_data, -1, true, false, + &true_data)); + + // This file overlap with the memtable, so it will flush it and add + // it self to L0 + ASSERT_EQ("2,0,0,0,1", FilesPerLevel()); + + // Insert 30 -> 50 using AddFile + file_data.clear(); + for (int i = 30; i <= 50; i++) { + file_data.emplace_back(Key(i), "L4"); + } + ASSERT_OK(GenerateAndAddExternalFile(options, file_data, -1, true, false, + &true_data)); + + // This file dont overlap with anything in the DB and fit in L4 as well + ASSERT_EQ("2,0,0,0,2", FilesPerLevel()); + + // Insert 10 -> 40 using AddFile + file_data.clear(); + for (int i = 10; i <= 40; i++) { + file_data.emplace_back(Key(i), "L3"); + } + ASSERT_OK(GenerateAndAddExternalFile(options, file_data, -1, true, false, + &true_data)); + + // This file overlap with files in L4, we will ingest it in L3 + ASSERT_EQ("2,0,0,1,2", FilesPerLevel()); + + size_t kcnt = 0; + VerifyDBFromMap(true_data, &kcnt, false); } +TEST_F(ExternalSSTFileTest, IngestFileWithGlobalSeqnoPickedSeqno) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + std::map true_data; + + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2, 3, 4, 5, 6}, -1, true, + false, &true_data)); + // File dont overwrite any keys, No seqno needed + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0); + + ASSERT_OK(GenerateAndAddExternalFile(options, {10, 11, 12, 13}, -1, true, + false, &true_data)); + // File dont overwrite any keys, No seqno needed + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0); + + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 4, 6}, -1, true, false, + &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1); + + ASSERT_OK(GenerateAndAddExternalFile(options, {11, 15, 19}, -1, true, false, + &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2); + + ASSERT_OK(GenerateAndAddExternalFile(options, {120, 130}, -1, true, false, + &true_data)); + // File dont overwrite any keys, No seqno needed + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2); + + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 130}, -1, true, false, + &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3); + + // Write some keys through normal write path + for (int i = 0; i < 50; i++) { + ASSERT_OK(Put(Key(i), "memtable")); + true_data[Key(i)] = "memtable"; + } + SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber(); + + ASSERT_OK(GenerateAndAddExternalFile(options, {60, 61, 62}, -1, true, false, + &true_data)); + // File dont overwrite any keys, No seqno needed + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno); + + ASSERT_OK(GenerateAndAddExternalFile(options, {40, 41, 42}, -1, true, false, + &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1); + + ASSERT_OK(GenerateAndAddExternalFile(options, {20, 30, 40}, -1, true, false, + &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2); + + const Snapshot* snapshot = db_->GetSnapshot(); + + // We will need a seqno for the file regardless if the file overwrite + // keys in the DB or not because we have a snapshot + ASSERT_OK(GenerateAndAddExternalFile(options, {1000, 1002}, -1, true, false, + &true_data)); + // A global seqno will be assigned anyway because of the snapshot + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 3); + + ASSERT_OK(GenerateAndAddExternalFile(options, {2000, 3002}, -1, true, false, + &true_data)); + // A global seqno will be assigned anyway because of the snapshot + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 4); + + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 20, 40, 100, 150}, -1, true, + false, &true_data)); + // A global seqno will be assigned anyway because of the snapshot + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5); + + db_->ReleaseSnapshot(snapshot); + + ASSERT_OK(GenerateAndAddExternalFile(options, {5000, 5001}, -1, true, false, + &true_data)); + // No snapshot anymore, no need to assign a seqno + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5); + + size_t kcnt = 0; + VerifyDBFromMap(true_data, &kcnt, false); +} + +TEST_F(ExternalSSTFileTest, IngestFileWithGlobalSeqnoMemtableFlush) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + uint64_t entries_in_memtable; + std::map true_data; + + for (int k : {10, 20, 40, 80}) { + ASSERT_OK(Put(Key(k), "memtable")); + true_data[Key(k)] = "memtable"; + } + db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable, + &entries_in_memtable); + ASSERT_GE(entries_in_memtable, 1); + + // No need for flush + ASSERT_OK(GenerateAndAddExternalFile(options, {90, 100, 110}, -1, true, false, + &true_data)); + db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable, + &entries_in_memtable); + ASSERT_GE(entries_in_memtable, 1); + + // This file will flush the memtable + ASSERT_OK(GenerateAndAddExternalFile(options, {19, 20, 21}, -1, true, false, + &true_data)); + db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable, + &entries_in_memtable); + ASSERT_EQ(entries_in_memtable, 0); + + for (int k : {200, 201, 205, 206}) { + ASSERT_OK(Put(Key(k), "memtable")); + true_data[Key(k)] = "memtable"; + } + db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable, + &entries_in_memtable); + ASSERT_GE(entries_in_memtable, 1); + + // No need for flush, this file keys fit between the memtable keys + ASSERT_OK(GenerateAndAddExternalFile(options, {202, 203, 204}, -1, true, + false, &true_data)); + db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable, + &entries_in_memtable); + ASSERT_GE(entries_in_memtable, 1); + + // This file will flush the memtable + ASSERT_OK(GenerateAndAddExternalFile(options, {206, 207}, -1, true, false, + &true_data)); + db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable, + &entries_in_memtable); + ASSERT_EQ(entries_in_memtable, 0); + + size_t kcnt = 0; + VerifyDBFromMap(true_data, &kcnt, false); +} + +TEST_F(ExternalSSTFileTest, L0SortingIssue) { + Options options = CurrentOptions(); + options.num_levels = 2; + DestroyAndReopen(options); + std::map true_data; + + ASSERT_OK(Put(Key(1), "memtable")); + ASSERT_OK(Put(Key(10), "memtable")); + + // No Flush needed, No global seqno needed, Ingest in L1 + ASSERT_OK(GenerateAndAddExternalFile(options, {7, 8}, -1, true, false)); + // No Flush needed, but need a global seqno, Ingest in L0 + ASSERT_OK(GenerateAndAddExternalFile(options, {7, 8}, -1, true, false)); + printf("%s\n", FilesPerLevel().c_str()); + + // Overwrite what we added using external files + ASSERT_OK(Put(Key(7), "memtable")); + ASSERT_OK(Put(Key(8), "memtable")); + + // Read values from memtable + ASSERT_EQ(Get(Key(7)), "memtable"); + ASSERT_EQ(Get(Key(8)), "memtable"); + + // Flush and read from L0 + ASSERT_OK(Flush()); + printf("%s\n", FilesPerLevel().c_str()); + ASSERT_EQ(Get(Key(7)), "memtable"); + ASSERT_EQ(Get(Key(8)), "memtable"); +} #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/db/version_builder.cc b/db/version_builder.cc index fac3c8ab2..7add24fa5 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -32,12 +32,12 @@ namespace rocksdb { bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) { - if (a->smallest_seqno != b->smallest_seqno) { - return a->smallest_seqno > b->smallest_seqno; - } if (a->largest_seqno != b->largest_seqno) { return a->largest_seqno > b->largest_seqno; } + if (a->smallest_seqno != b->smallest_seqno) { + return a->smallest_seqno > b->smallest_seqno; + } // Break ties by file number return a->fd.GetNumber() > b->fd.GetNumber(); } @@ -146,13 +146,22 @@ class VersionBuilder::Rep { abort(); } - if (!(f1->largest_seqno > f2->largest_seqno || - // We can have multiple files with seqno = 0 as a result of - // using DB::AddFile() - (f1->largest_seqno == 0 && f2->largest_seqno == 0))) { - fprintf(stderr, - "L0 files seqno missmatch %" PRIu64 " vs. %" PRIu64 "\n", - f1->largest_seqno, f2->largest_seqno); + if (f2->smallest_seqno == f2->largest_seqno) { + // This is an external file that we ingested + SequenceNumber external_file_seqno = f2->smallest_seqno; + if (!(external_file_seqno < f1->largest_seqno || + external_file_seqno == 0)) { + fprintf(stderr, "L0 file with seqno %" PRIu64 " %" PRIu64 + " vs. file with global_seqno %" PRIu64 "\n", + f1->smallest_seqno, f1->largest_seqno, + external_file_seqno); + abort(); + } + } else if (f1->smallest_seqno <= f2->smallest_seqno) { + fprintf(stderr, "L0 files seqno %" PRIu64 " %" PRIu64 + " vs. %" PRIu64 " %" PRIu64 "\n", + f1->smallest_seqno, f1->largest_seqno, f2->smallest_seqno, + f2->largest_seqno); abort(); } } else { diff --git a/db/version_set.cc b/db/version_set.cc index 730c797d5..df53181c2 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -808,41 +808,51 @@ void Version::AddIterators(const ReadOptions& read_options, MergeIteratorBuilder* merge_iter_builder) { assert(storage_info_.finalized_); - if (storage_info_.num_non_empty_levels() == 0) { - // No file in the Version. + for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) { + AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level); + } +} + +void Version::AddIteratorsForLevel(const ReadOptions& read_options, + const EnvOptions& soptions, + MergeIteratorBuilder* merge_iter_builder, + int level) { + assert(storage_info_.finalized_); + if (level >= storage_info_.num_non_empty_levels()) { + // This is an empty level + return; + } else if (storage_info_.LevelFilesBrief(level).num_files == 0) { + // No files in this level return; } auto* arena = merge_iter_builder->GetArena(); - - // Merge all level zero files together since they may overlap - for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) { - const auto& file = storage_info_.LevelFilesBrief(0).files[i]; - merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator( - read_options, soptions, cfd_->internal_comparator(), file.fd, nullptr, - cfd_->internal_stats()->GetFileReadHist(0), false, arena, - false /* skip_filters */, 0 /* level */)); - } - - // For levels > 0, we can use a concatenating iterator that sequentially - // walks through the non-overlapping files in the level, opening them - // lazily. - for (int level = 1; level < storage_info_.num_non_empty_levels(); level++) { - if (storage_info_.LevelFilesBrief(level).num_files != 0) { - auto* mem = arena->AllocateAligned(sizeof(LevelFileIteratorState)); - auto* state = new (mem) LevelFileIteratorState( - cfd_->table_cache(), read_options, soptions, - cfd_->internal_comparator(), - cfd_->internal_stats()->GetFileReadHist(level), - false /* for_compaction */, - cfd_->ioptions()->prefix_extractor != nullptr, IsFilterSkipped(level), - level, nullptr /* range_del_agg */); - mem = arena->AllocateAligned(sizeof(LevelFileNumIterator)); - auto* first_level_iter = new (mem) LevelFileNumIterator( - cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level)); - merge_iter_builder->AddIterator( - NewTwoLevelIterator(state, first_level_iter, arena, false)); + if (level == 0) { + // Merge all level zero files together since they may overlap + for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) { + const auto& file = storage_info_.LevelFilesBrief(0).files[i]; + merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator( + read_options, soptions, cfd_->internal_comparator(), file.fd, nullptr, + cfd_->internal_stats()->GetFileReadHist(0), false, arena, + false /* skip_filters */, 0 /* level */)); } + } else { + // For levels > 0, we can use a concatenating iterator that sequentially + // walks through the non-overlapping files in the level, opening them + // lazily. + auto* mem = arena->AllocateAligned(sizeof(LevelFileIteratorState)); + auto* state = new (mem) LevelFileIteratorState( + cfd_->table_cache(), read_options, soptions, + cfd_->internal_comparator(), + cfd_->internal_stats()->GetFileReadHist(level), + false /* for_compaction */, + cfd_->ioptions()->prefix_extractor != nullptr, IsFilterSkipped(level), + level, nullptr /* range_del_agg */); + mem = arena->AllocateAligned(sizeof(LevelFileNumIterator)); + auto* first_level_iter = new (mem) LevelFileNumIterator( + cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level)); + merge_iter_builder->AddIterator( + NewTwoLevelIterator(state, first_level_iter, arena, false)); } } diff --git a/db/version_set.h b/db/version_set.h index f84d21f77..ceeca3d5b 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -435,6 +435,10 @@ class Version { void AddIterators(const ReadOptions&, const EnvOptions& soptions, MergeIteratorBuilder* merger_iter_builder); + void AddIteratorsForLevel(const ReadOptions&, const EnvOptions& soptions, + MergeIteratorBuilder* merger_iter_builder, + int level); + // Lookup the value for key. If found, store it in *val and // return OK. Else return a non-OK status. // Uses *operands to store merge_operator operations to apply later. diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 39111f5d2..283265909 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -31,6 +31,11 @@ #undef DeleteFile #endif +#if defined(__GNUC__) || defined(__clang__) +#define ROCKSDB_DEPRECATED_FUNC __attribute__((__deprecated__)) +#elif _WIN32 +#define ROCKSDB_DEPRECATED_FUNC __declspec(deprecated) +#endif namespace rocksdb { @@ -589,29 +594,20 @@ class DB { return CompactRange(options, DefaultColumnFamily(), begin, end); } -#if defined(__GNUC__) || defined(__clang__) - __attribute__((__deprecated__)) -#elif _WIN32 - __declspec(deprecated) -#endif - virtual Status - CompactRange(ColumnFamilyHandle* column_family, const Slice* begin, - const Slice* end, bool change_level = false, - int target_level = -1, uint32_t target_path_id = 0) { + ROCKSDB_DEPRECATED_FUNC virtual Status CompactRange( + ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end, + bool change_level = false, int target_level = -1, + uint32_t target_path_id = 0) { CompactRangeOptions options; options.change_level = change_level; options.target_level = target_level; options.target_path_id = target_path_id; return CompactRange(options, column_family, begin, end); } -#if defined(__GNUC__) || defined(__clang__) - __attribute__((__deprecated__)) -#elif _WIN32 - __declspec(deprecated) -#endif - virtual Status - CompactRange(const Slice* begin, const Slice* end, bool change_level = false, - int target_level = -1, uint32_t target_path_id = 0) { + + ROCKSDB_DEPRECATED_FUNC virtual Status CompactRange( + const Slice* begin, const Slice* end, bool change_level = false, + int target_level = -1, uint32_t target_path_id = 0) { CompactRangeOptions options; options.change_level = change_level; options.target_level = target_level; @@ -803,79 +799,126 @@ class DB { GetColumnFamilyMetaData(DefaultColumnFamily(), metadata); } - // Batch load table files whose paths stored in "file_path_list" into - // "column_family", a vector of ExternalSstFileInfo can be used - // instead of "file_path_list" to do a blind batch add that wont - // need to read the file, move_file can be set to true to - // move the files instead of copying them, skip_snapshot_check can be set to - // true to ignore the snapshot, make sure that you know that when you use it, - // snapshots see the data that is added in the new files. + // IngestExternalFile() will load a list of external SST files (1) into the DB + // We will try to find the lowest possible level that the file can fit in, and + // ingest the file into this level (2). A file that have a key range that + // overlap with the memtable key range will require us to Flush the memtable + // first before ingesting the file. // - // Current Requirements: - // (1) The key ranges of the files don't overlap with each other - // (2) The key range of any file in list doesn't overlap with - // existing keys or tombstones in DB. - // (3) No snapshots are held (check skip_snapshot_check to skip this check). - // - // Notes: We will try to ingest the files to the lowest possible level - // even if the file compression dont match the level compression - virtual Status AddFile(ColumnFamilyHandle* column_family, - const std::vector& file_path_list, - bool move_file = false, bool skip_snapshot_check = false) = 0; - virtual Status AddFile(const std::vector& file_path_list, - bool move_file = false, bool skip_snapshot_check = false) { - return AddFile(DefaultColumnFamily(), file_path_list, move_file, skip_snapshot_check); + // (1) External SST files can be created using SstFileWriter + // (2) We will try to ingest the files to the lowest possible level + // even if the file compression dont match the level compression + virtual Status IngestExternalFile( + ColumnFamilyHandle* column_family, + const std::vector& external_files, + const IngestExternalFileOptions& options) = 0; + + virtual Status IngestExternalFile( + const std::vector& external_files, + const IngestExternalFileOptions& options) { + return IngestExternalFile(DefaultColumnFamily(), external_files, options); } -#if defined(__GNUC__) || defined(__clang__) - __attribute__((__deprecated__)) -#elif _WIN32 - __declspec(deprecated) -#endif - virtual Status - AddFile(ColumnFamilyHandle* column_family, const std::string& file_path, - bool move_file = false, bool skip_snapshot_check = false) { - return AddFile(column_family, std::vector(1, file_path), - move_file, skip_snapshot_check); + + // AddFile() is deprecated, please use IngestExternalFile() + ROCKSDB_DEPRECATED_FUNC virtual Status AddFile( + ColumnFamilyHandle* column_family, + const std::vector& file_path_list, bool move_file = false, + bool skip_snapshot_check = false) { + IngestExternalFileOptions ifo; + ifo.move_files = move_file; + ifo.snapshot_consistency = !skip_snapshot_check; + ifo.allow_global_seqno = false; + ifo.allow_blocking_flush = false; + return IngestExternalFile(column_family, file_path_list, ifo); } -#if defined(__GNUC__) || defined(__clang__) - __attribute__((__deprecated__)) -#elif _WIN32 - __declspec(deprecated) -#endif - virtual Status - AddFile(const std::string& file_path, bool move_file = false, bool skip_snapshot_check = false) { - return AddFile(DefaultColumnFamily(), - std::vector(1, file_path), move_file, skip_snapshot_check); + + ROCKSDB_DEPRECATED_FUNC virtual Status AddFile( + const std::vector& file_path_list, bool move_file = false, + bool skip_snapshot_check = false) { + IngestExternalFileOptions ifo; + ifo.move_files = move_file; + ifo.snapshot_consistency = !skip_snapshot_check; + ifo.allow_global_seqno = false; + ifo.allow_blocking_flush = false; + return IngestExternalFile(DefaultColumnFamily(), file_path_list, ifo); + } + + // AddFile() is deprecated, please use IngestExternalFile() + ROCKSDB_DEPRECATED_FUNC virtual Status AddFile( + ColumnFamilyHandle* column_family, const std::string& file_path, + bool move_file = false, bool skip_snapshot_check = false) { + IngestExternalFileOptions ifo; + ifo.move_files = move_file; + ifo.snapshot_consistency = !skip_snapshot_check; + ifo.allow_global_seqno = false; + ifo.allow_blocking_flush = false; + return IngestExternalFile(column_family, {file_path}, ifo); + } + + ROCKSDB_DEPRECATED_FUNC virtual Status AddFile( + const std::string& file_path, bool move_file = false, + bool skip_snapshot_check = false) { + IngestExternalFileOptions ifo; + ifo.move_files = move_file; + ifo.snapshot_consistency = !skip_snapshot_check; + ifo.allow_global_seqno = false; + ifo.allow_blocking_flush = false; + return IngestExternalFile(DefaultColumnFamily(), {file_path}, ifo); } // Load table file with information "file_info" into "column_family" - virtual Status AddFile(ColumnFamilyHandle* column_family, - const std::vector& file_info_list, - bool move_file = false, bool skip_snapshot_check = false) = 0; - virtual Status AddFile(const std::vector& file_info_list, - bool move_file = false, bool skip_snapshot_check = false) { - return AddFile(DefaultColumnFamily(), file_info_list, move_file, skip_snapshot_check); + ROCKSDB_DEPRECATED_FUNC virtual Status AddFile( + ColumnFamilyHandle* column_family, + const std::vector& file_info_list, + bool move_file = false, bool skip_snapshot_check = false) { + std::vector external_files; + for (const ExternalSstFileInfo& file_info : file_info_list) { + external_files.push_back(file_info.file_path); + } + IngestExternalFileOptions ifo; + ifo.move_files = move_file; + ifo.snapshot_consistency = !skip_snapshot_check; + ifo.allow_global_seqno = false; + ifo.allow_blocking_flush = false; + return IngestExternalFile(column_family, external_files, ifo); } -#if defined(__GNUC__) || defined(__clang__) - __attribute__((__deprecated__)) -#elif _WIN32 - __declspec(deprecated) -#endif - virtual Status - AddFile(ColumnFamilyHandle* column_family, - const ExternalSstFileInfo* file_info, bool move_file = false, bool skip_snapshot_check = false) { - return AddFile(column_family, - std::vector(1, *file_info), move_file, skip_snapshot_check); + + ROCKSDB_DEPRECATED_FUNC virtual Status AddFile( + const std::vector& file_info_list, + bool move_file = false, bool skip_snapshot_check = false) { + std::vector external_files; + for (const ExternalSstFileInfo& file_info : file_info_list) { + external_files.push_back(file_info.file_path); + } + IngestExternalFileOptions ifo; + ifo.move_files = move_file; + ifo.snapshot_consistency = !skip_snapshot_check; + ifo.allow_global_seqno = false; + ifo.allow_blocking_flush = false; + return IngestExternalFile(DefaultColumnFamily(), external_files, ifo); } -#if defined(__GNUC__) || defined(__clang__) - __attribute__((__deprecated__)) -#elif _WIN32 - __declspec(deprecated) -#endif - virtual Status - AddFile(const ExternalSstFileInfo* file_info, bool move_file = false, bool skip_snapshot_check = false) { - return AddFile(DefaultColumnFamily(), - std::vector(1, *file_info), move_file, skip_snapshot_check); + + ROCKSDB_DEPRECATED_FUNC virtual Status AddFile( + ColumnFamilyHandle* column_family, const ExternalSstFileInfo* file_info, + bool move_file = false, bool skip_snapshot_check = false) { + IngestExternalFileOptions ifo; + ifo.move_files = move_file; + ifo.snapshot_consistency = !skip_snapshot_check; + ifo.allow_global_seqno = false; + ifo.allow_blocking_flush = false; + return IngestExternalFile(column_family, {file_info->file_path}, ifo); + } + + ROCKSDB_DEPRECATED_FUNC virtual Status AddFile( + const ExternalSstFileInfo* file_info, bool move_file = false, + bool skip_snapshot_check = false) { + IngestExternalFileOptions ifo; + ifo.move_files = move_file; + ifo.snapshot_consistency = !skip_snapshot_check; + ifo.allow_global_seqno = false; + ifo.allow_blocking_flush = false; + return IngestExternalFile(DefaultColumnFamily(), {file_info->file_path}, + ifo); } #endif // ROCKSDB_LITE diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 4ce704a63..70750c926 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1617,6 +1617,21 @@ struct CompactRangeOptions { BottommostLevelCompaction::kIfHaveCompactionFilter; }; +// IngestExternalFileOptions is used by IngestExternalFile() +struct IngestExternalFileOptions { + // Can be set to true to move the files instead of copying them. + bool move_files = false; + // If set to false, an ingested file keys could appear in existing snapshots + // that where created before the file was ingested. + bool snapshot_consistency = true; + // If set to false, IngestExternalFile() will fail if the file key range + // overlaps with existing keys or tombstones in the DB. + bool allow_global_seqno = true; + // If set to false and the file key range overlaps with the memtable key range + // (memtable flush required), IngestExternalFile will fail. + bool allow_blocking_flush = true; +}; + } // namespace rocksdb #endif // STORAGE_ROCKSDB_INCLUDE_OPTIONS_H_ diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index ed1903e2b..d223a7615 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -68,16 +68,12 @@ class StackableDB : public DB { return db_->MultiGet(options, column_family, keys, values); } - using DB::AddFile; - virtual Status AddFile(ColumnFamilyHandle* column_family, - const std::vector& file_info_list, - bool move_file, bool skip_snapshot_check) override { - return db_->AddFile(column_family, file_info_list, move_file, skip_snapshot_check); - } - virtual Status AddFile(ColumnFamilyHandle* column_family, - const std::vector& file_path_list, - bool move_file, bool skip_snapshot_check) override { - return db_->AddFile(column_family, file_path_list, move_file, skip_snapshot_check); + using DB::IngestExternalFile; + virtual Status IngestExternalFile( + ColumnFamilyHandle* column_family, + const std::vector& external_files, + const IngestExternalFileOptions& options) override { + return db_->IngestExternalFile(column_family, external_files, options); } using DB::KeyMayExist; diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index eec6701e4..b86cf63f0 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -16,8 +16,9 @@ #include #include "include/org_rocksdb_RocksDB.h" -#include "rocksdb/db.h" #include "rocksdb/cache.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" #include "rocksdb/types.h" #include "rocksjni/portal.h" @@ -1757,17 +1758,6 @@ void add_file_helper(JNIEnv* env, const jobjectArray& jfile_path_list, } } -void add_file_helper( - JNIEnv* env, jlongArray jfi_handle_list, int fi_handle_list_len, - std::vector* file_info_list) { - jlong* jfih = env->GetLongArrayElements(jfi_handle_list, NULL); - for (int i = 0; i < fi_handle_list_len; i++) { - auto* file_info = - reinterpret_cast(*(jfih + i)); - file_info_list->push_back(*file_info); - } -} - /* * Class: org_rocksdb_RocksDB * Method: addFile @@ -1783,32 +1773,15 @@ void Java_org_rocksdb_RocksDB_addFile__JJ_3Ljava_lang_String_2IZ( &file_path_list); auto* column_family = reinterpret_cast(jcf_handle); + rocksdb::IngestExternalFileOptions ifo; + ifo.move_files = static_cast(jmove_file); + ifo.snapshot_consistency = true; + ifo.allow_global_seqno = false; + ifo.allow_blocking_flush = false; rocksdb::Status s = - db->AddFile(column_family, file_path_list, static_cast(jmove_file)); + db->IngestExternalFile(column_family, file_path_list, ifo); if (!s.ok()) { rocksdb::RocksDBExceptionJni::ThrowNew(env, s); } } -/* - * Class: org_rocksdb_RocksDB - * Method: addFile - * Signature: (JJ[JIZ)V - */ -void Java_org_rocksdb_RocksDB_addFile__JJ_3JIZ( - JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jcf_handle, - jlongArray jfile_info_handle_list, jint jfile_info_handle_list_len, - jboolean jmove_file) { - auto* db = reinterpret_cast(jdb_handle); - std::vector file_info_list; - add_file_helper(env, jfile_info_handle_list, - static_cast(jfile_info_handle_list_len), - &file_info_list); - auto* column_family = - reinterpret_cast(jcf_handle); - rocksdb::Status s = - db->AddFile(column_family, file_info_list, static_cast(jmove_file)); - if (!s.ok()) { - rocksdb::RocksDBExceptionJni::ThrowNew(env, s); - } -} diff --git a/src.mk b/src.mk index 6c21b557d..533b6c61d 100644 --- a/src.mk +++ b/src.mk @@ -17,9 +17,9 @@ LIB_SOURCES = \ db/db_impl_debug.cc \ db/db_impl_readonly.cc \ db/db_impl_experimental.cc \ - db/db_impl_add_file.cc \ db/db_info_dumper.cc \ db/db_iter.cc \ + db/external_sst_file_ingestion_job.cc \ db/experimental.cc \ db/event_helpers.cc \ db/file_indexer.cc \