diff --git a/CMakeLists.txt b/CMakeLists.txt index 65904b8ca..b49a13572 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -520,6 +520,7 @@ set(SOURCES db/flush_job.cc db/flush_scheduler.cc db/forward_iterator.cc + db/import_column_family_job.cc db/internal_stats.cc db/logs_with_prep_tracker.cc db/log_reader.cc diff --git a/Makefile b/Makefile index 100f160ca..f8a904bd3 100644 --- a/Makefile +++ b/Makefile @@ -500,6 +500,7 @@ TESTS = \ plain_table_db_test \ comparator_db_test \ external_sst_file_test \ + import_column_family_test \ prefix_test \ skiplist_test \ write_buffer_manager_test \ @@ -577,6 +578,7 @@ PARALLEL_TEST = \ db_universal_compaction_test \ db_wal_test \ external_sst_file_test \ + import_column_family_test \ fault_injection_test \ inlineskiplist_test \ manual_compaction_test \ @@ -1274,6 +1276,9 @@ external_sst_file_basic_test: db/external_sst_file_basic_test.o db/db_test_util. external_sst_file_test: db/external_sst_file_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +import_column_family_test: db/import_column_family_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_tailing_iter_test: db/db_tailing_iter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index eda105139..cfd9ef73d 100644 --- a/TARGETS +++ b/TARGETS @@ -113,6 +113,7 @@ cpp_library( "db/flush_job.cc", "db/flush_scheduler.cc", "db/forward_iterator.cc", + "db/import_column_family_job.cc", "db/internal_stats.cc", "db/log_reader.cc", "db/log_writer.cc", diff --git a/db/compacted_db_impl.h b/db/compacted_db_impl.h index c1b8da9a7..e71ce2494 100644 --- a/db/compacted_db_impl.h +++ b/db/compacted_db_impl.h @@ -85,6 +85,15 @@ class CompactedDBImpl : public DBImpl { const IngestExternalFileOptions& /*ingestion_options*/) override { return Status::NotSupported("Not supported in compacted db mode."); } + using DB::CreateColumnFamilyWithImport; + virtual Status CreateColumnFamilyWithImport( + const ColumnFamilyOptions& /*options*/, + const std::string& /*column_family_name*/, + const ImportColumnFamilyOptions& /*import_options*/, + const ExportImportFilesMetaData& /*metadata*/, + ColumnFamilyHandle** /*handle*/) override { + return Status::NotSupported("Not supported in compacted db mode."); + } private: friend class DB; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 6f2ebdc80..af9aea011 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -33,6 +33,7 @@ #include "db/error_handler.h" #include "db/event_helpers.h" #include "db/external_sst_file_ingestion_job.h" +#include "db/import_column_family_job.h" #include "db/flush_job.h" #include "db/forward_iterator.h" #include "db/job_context.h" @@ -3894,6 +3895,127 @@ Status DBImpl::IngestExternalFiles( return status; } +Status DBImpl::CreateColumnFamilyWithImport( + const ColumnFamilyOptions& options, const std::string& column_family_name, + const ImportColumnFamilyOptions& import_options, + const ExportImportFilesMetaData& metadata, + ColumnFamilyHandle** handle) { + assert(handle != nullptr); + assert(*handle == nullptr); + std::string cf_comparator_name = options.comparator->Name(); + if (cf_comparator_name != metadata.db_comparator_name) { + return Status::InvalidArgument("Comparator name mismatch"); + } + + // Create column family. + auto status = CreateColumnFamily(options, column_family_name, handle); + if (!status.ok()) { + return status; + } + + // Import sst files from metadata. + auto cfh = reinterpret_cast(*handle); + auto cfd = cfh->cfd(); + ImportColumnFamilyJob import_job(env_, versions_.get(), cfd, + immutable_db_options_, env_options_, + import_options, metadata.files); + + SuperVersionContext dummy_sv_ctx(/* create_superversion */ true); + VersionEdit dummy_edit; + uint64_t next_file_number = 0; + std::list::iterator pending_output_elem; + { + // Lock db mutex + InstrumentedMutexLock l(&mutex_); + if (error_handler_.IsDBStopped()) { + // Don't import files when there is a bg_error + status = error_handler_.GetBGError(); + } + + // Make sure that bg cleanup wont delete the files that we are importing + pending_output_elem = CaptureCurrentFileNumberInPendingOutputs(); + + if (status.ok()) { + // If crash happen after a hard link established, Recover function may + // reuse the file number that has already assigned to the internal file, + // and this will overwrite the external file. To protect the external + // file, we have to make sure the file number will never being reused. + next_file_number = + versions_->FetchAddFileNumber(metadata.files.size()); + auto cf_options = cfd->GetLatestMutableCFOptions(); + status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_, + directories_.GetDbDir()); + if (status.ok()) { + InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options); + } + } + } + dummy_sv_ctx.Clean(); + + if (status.ok()) { + SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); + status = import_job.Prepare(next_file_number, sv); + CleanupSuperVersion(sv); + } + + if (status.ok()) { + SuperVersionContext sv_context(true /*create_superversion*/); + { + // Lock db mutex + InstrumentedMutexLock l(&mutex_); + + // Stop writes to the DB by entering both write threads + WriteThread::Writer w; + write_thread_.EnterUnbatched(&w, &mutex_); + WriteThread::Writer nonmem_w; + if (two_write_queues_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } + + num_running_ingest_file_++; + assert(!cfd->IsDropped()); + status = import_job.Run(); + + // Install job edit [Mutex will be unlocked here] + if (status.ok()) { + auto cf_options = cfd->GetLatestMutableCFOptions(); + status = versions_->LogAndApply(cfd, *cf_options, import_job.edit(), + &mutex_, directories_.GetDbDir()); + if (status.ok()) { + InstallSuperVersionAndScheduleWork(cfd, &sv_context, *cf_options); + } + } + + // Resume writes to the DB + if (two_write_queues_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } + write_thread_.ExitUnbatched(&w); + + num_running_ingest_file_--; + if (num_running_ingest_file_ == 0) { + bg_cv_.SignalAll(); + } + } + // mutex_ is unlocked here + + sv_context.Clean(); + } + + { + InstrumentedMutexLock l(&mutex_); + ReleaseFileNumberFromPendingOutputs(pending_output_elem); + } + + import_job.Cleanup(status); + if (!status.ok()) { + DropColumnFamily(*handle); + DestroyColumnFamilyHandle(*handle); + *handle = nullptr; + } + return status; +} + Status DBImpl::VerifyChecksum() { Status s; std::vector cfd_list; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index d417035b1..547e3e1d6 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -27,6 +27,7 @@ #include "db/external_sst_file_ingestion_job.h" #include "db/flush_job.h" #include "db/flush_scheduler.h" +#include "db/import_column_family_job.h" #include "db/internal_stats.h" #include "db/log_writer.h" #include "db/logs_with_prep_tracker.h" @@ -356,6 +357,13 @@ class DBImpl : public DB { virtual Status IngestExternalFiles( const std::vector& args) override; + using DB::CreateColumnFamilyWithImport; + virtual Status CreateColumnFamilyWithImport( + const ColumnFamilyOptions& options, const std::string& column_family_name, + const ImportColumnFamilyOptions& import_options, + const ExportImportFilesMetaData& metadata, + ColumnFamilyHandle** handle) override; + virtual Status VerifyChecksum() override; using DB::StartTrace; @@ -1803,7 +1811,8 @@ class DBImpl : public DB { std::string db_absolute_path_; - // Number of running IngestExternalFile() calls. + // Number of running IngestExternalFile() or CreateColumnFamilyWithImport() + // calls. // REQUIRES: mutex held int num_running_ingest_file_; diff --git a/db/db_impl/db_impl_readonly.h b/db/db_impl/db_impl_readonly.h index 18df900cb..ad307677c 100644 --- a/db/db_impl/db_impl_readonly.h +++ b/db/db_impl/db_impl_readonly.h @@ -115,6 +115,16 @@ class DBImplReadOnly : public DBImpl { return Status::NotSupported("Not supported operation in read only mode."); } + using DB::CreateColumnFamilyWithImport; + virtual Status CreateColumnFamilyWithImport( + const ColumnFamilyOptions& /*options*/, + const std::string& /*column_family_name*/, + const ImportColumnFamilyOptions& /*import_options*/, + const ExportImportFilesMetaData& /*metadata*/, + ColumnFamilyHandle** /*handle*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + private: friend class DB; diff --git a/db/db_test.cc b/db/db_test.cc index 69e91923c..36bdda59e 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2492,6 +2492,16 @@ class ModelDB : public DB { return Status::NotSupported("Not implemented"); } + using DB::CreateColumnFamilyWithImport; + virtual Status CreateColumnFamilyWithImport( + const ColumnFamilyOptions& /*options*/, + const std::string& /*column_family_name*/, + const ImportColumnFamilyOptions& /*import_options*/, + const ExportImportFilesMetaData& /*metadata*/, + ColumnFamilyHandle** /*handle*/) override { + return Status::NotSupported("Not implemented."); + } + Status VerifyChecksum() override { return Status::NotSupported("Not implemented."); } diff --git a/db/import_column_family_job.cc b/db/import_column_family_job.cc new file mode 100644 index 000000000..3c00a2591 --- /dev/null +++ b/db/import_column_family_job.cc @@ -0,0 +1,257 @@ +#ifndef ROCKSDB_LITE + +#include "db/import_column_family_job.h" + +#include +#include +#include +#include + +#include "db/version_edit.h" +#include "file/file_util.h" +#include "table/merging_iterator.h" +#include "table/scoped_arena_iterator.h" +#include "table/sst_file_writer_collectors.h" +#include "table/table_builder.h" +#include "util/file_reader_writer.h" +#include "util/stop_watch.h" + +namespace rocksdb { + +Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number, + SuperVersion* sv) { + Status status; + + // Read the information of files we are importing + for (const auto& file_metadata : metadata_) { + const auto file_path = file_metadata.db_path + "/" + file_metadata.name; + IngestedFileInfo file_to_import; + status = GetIngestedFileInfo(file_path, &file_to_import, sv); + if (!status.ok()) { + return status; + } + files_to_import_.push_back(file_to_import); + } + + const auto ucmp = cfd_->internal_comparator().user_comparator(); + auto num_files = files_to_import_.size(); + if (num_files == 0) { + return Status::InvalidArgument("The list of files is empty"); + } else if (num_files > 1) { + // Verify that passed files don't have overlapping ranges in any particular + // level. + int min_level = 1; // Check for overlaps in Level 1 and above. + int max_level = -1; + for (const auto& file_metadata : metadata_) { + if (file_metadata.level > max_level) { + max_level = file_metadata.level; + } + } + for (int level = min_level; level <= max_level; ++level) { + autovector sorted_files; + for (size_t i = 0; i < num_files; i++) { + if (metadata_[i].level == level) { + sorted_files.push_back(&files_to_import_[i]); + } + } + + std::sort(sorted_files.begin(), sorted_files.end(), + [&ucmp](const IngestedFileInfo* info1, + const IngestedFileInfo* info2) { + return ucmp->Compare(info1->smallest_user_key, + info2->smallest_user_key) < 0; + }); + + for (size_t i = 0; i < sorted_files.size() - 1; i++) { + if (ucmp->Compare(sorted_files[i]->largest_user_key, + sorted_files[i + 1]->smallest_user_key) >= 0) { + return Status::InvalidArgument("Files have overlapping ranges"); + } + } + } + } + + for (const auto& f : files_to_import_) { + if (f.num_entries == 0) { + return Status::InvalidArgument("File contain no entries"); + } + + if (!f.smallest_internal_key().Valid() || + !f.largest_internal_key().Valid()) { + return Status::Corruption("File has corrupted keys"); + } + } + + // Copy/Move external files into DB + auto hardlink_files = import_options_.move_files; + for (auto& f : files_to_import_) { + f.fd = FileDescriptor(next_file_number++, 0, f.file_size); + + const auto path_outside_db = f.external_file_path; + const auto path_inside_db = TableFileName( + cfd_->ioptions()->cf_paths, f.fd.GetNumber(), f.fd.GetPathId()); + + if (hardlink_files) { + status = env_->LinkFile(path_outside_db, path_inside_db); + if (status.IsNotSupported()) { + // Original file is on a different FS, use copy instead of hard linking + hardlink_files = false; + } + } + if (!hardlink_files) { + status = CopyFile(env_, path_outside_db, path_inside_db, 0, + db_options_.use_fsync); + } + if (!status.ok()) { + break; + } + f.copy_file = !hardlink_files; + f.internal_file_path = path_inside_db; + } + + if (!status.ok()) { + // We failed, remove all files that we copied into the db + for (const auto& f : files_to_import_) { + if (f.internal_file_path.empty()) { + break; + } + const auto s = env_->DeleteFile(f.internal_file_path); + if (!s.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "AddFile() clean up for file %s failed : %s", + f.internal_file_path.c_str(), s.ToString().c_str()); + } + } + } + + return status; +} + +// REQUIRES: we have become the only writer by entering both write_thread_ and +// nonmem_write_thread_ +Status ImportColumnFamilyJob::Run() { + Status status; + edit_.SetColumnFamily(cfd_->GetID()); + + for (size_t i = 0; i < files_to_import_.size(); ++i) { + const auto& f = files_to_import_[i]; + const auto& file_metadata = metadata_[i]; + edit_.AddFile(file_metadata.level, f.fd.GetNumber(), f.fd.GetPathId(), + f.fd.GetFileSize(), f.smallest_internal_key(), + f.largest_internal_key(), file_metadata.smallest_seqno, + file_metadata.largest_seqno, false); + + // If incoming sequence number is higher, update local sequence number. + if (file_metadata.largest_seqno > versions_->LastSequence()) { + versions_->SetLastAllocatedSequence(file_metadata.largest_seqno); + versions_->SetLastPublishedSequence(file_metadata.largest_seqno); + versions_->SetLastSequence(file_metadata.largest_seqno); + } + } + + return status; +} + +void ImportColumnFamilyJob::Cleanup(const Status& status) { + if (!status.ok()) { + // We failed to add files to the database remove all the files we copied. + for (const auto& f : files_to_import_) { + const auto s = env_->DeleteFile(f.internal_file_path); + if (!s.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "AddFile() clean up for file %s failed : %s", + f.internal_file_path.c_str(), s.ToString().c_str()); + } + } + } else if (status.ok() && import_options_.move_files) { + // The files were moved and added successfully, remove original file links + for (IngestedFileInfo& f : files_to_import_) { + const auto s = env_->DeleteFile(f.external_file_path); + if (!s.ok()) { + ROCKS_LOG_WARN( + db_options_.info_log, + "%s was added to DB successfully but failed to remove original " + "file link : %s", + f.external_file_path.c_str(), s.ToString().c_str()); + } + } + } +} + +Status ImportColumnFamilyJob::GetIngestedFileInfo( + const std::string& external_file, IngestedFileInfo* file_to_import, + SuperVersion* sv) { + file_to_import->external_file_path = external_file; + + // Get external file size + auto status = env_->GetFileSize(external_file, &file_to_import->file_size); + if (!status.ok()) { + return status; + } + + // Create TableReader for external file + std::unique_ptr table_reader; + std::unique_ptr sst_file; + std::unique_ptr sst_file_reader; + + status = env_->NewRandomAccessFile(external_file, &sst_file, env_options_); + if (!status.ok()) { + return status; + } + sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file), + external_file)); + + status = cfd_->ioptions()->table_factory->NewTableReader( + TableReaderOptions(*cfd_->ioptions(), + sv->mutable_cf_options.prefix_extractor.get(), + env_options_, cfd_->internal_comparator()), + std::move(sst_file_reader), file_to_import->file_size, &table_reader); + if (!status.ok()) { + return status; + } + + // Get the external file properties + auto props = table_reader->GetTableProperties(); + + // Set original_seqno to 0. + file_to_import->original_seqno = 0; + + // Get number of entries in table + file_to_import->num_entries = props->num_entries; + + ParsedInternalKey key; + ReadOptions ro; + // During reading the external file we can cache blocks that we read into + // the block cache, if we later change the global seqno of this file, we will + // have block in cache that will include keys with wrong seqno. + // We need to disable fill_cache so that we read from the file without + // updating the block cache. + ro.fill_cache = false; + std::unique_ptr iter(table_reader->NewIterator( + ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr, + /*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion)); + + // Get first (smallest) key from file + iter->SeekToFirst(); + if (!ParseInternalKey(iter->key(), &key)) { + return Status::Corruption("external file have corrupted keys"); + } + file_to_import->smallest_user_key = key.user_key.ToString(); + + // Get last (largest) key from file + iter->SeekToLast(); + if (!ParseInternalKey(iter->key(), &key)) { + return Status::Corruption("external file have corrupted keys"); + } + file_to_import->largest_user_key = key.user_key.ToString(); + + file_to_import->cf_id = static_cast(props->column_family_id); + + file_to_import->table_properties = *props; + + return status; +} + +} // namespace rocksdb + +#endif // !ROCKSDB_LITE diff --git a/db/import_column_family_job.h b/db/import_column_family_job.h new file mode 100644 index 000000000..5b8577df1 --- /dev/null +++ b/db/import_column_family_job.h @@ -0,0 +1,70 @@ +#pragma once +#include +#include +#include + +#include "db/column_family.h" +#include "db/dbformat.h" +#include "db/external_sst_file_ingestion_job.h" +#include "db/snapshot_impl.h" +#include "options/db_options.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/metadata.h" +#include "rocksdb/sst_file_writer.h" +#include "util/autovector.h" + +namespace rocksdb { + +// Imports a set of sst files as is into a new column family. Logic is similar +// to ExternalSstFileIngestionJob. +class ImportColumnFamilyJob { + public: + ImportColumnFamilyJob( + Env* env, VersionSet* versions, ColumnFamilyData* cfd, + const ImmutableDBOptions& db_options, const EnvOptions& env_options, + const ImportColumnFamilyOptions& import_options, + const std::vector& metadata) + : env_(env), + versions_(versions), + cfd_(cfd), + db_options_(db_options), + env_options_(env_options), + import_options_(import_options), + metadata_(metadata) {} + + // Prepare the job by copying external files into the DB. + Status Prepare(uint64_t next_file_number, SuperVersion* sv); + + // Will execute the import job and prepare edit() to be applied. + // REQUIRES: Mutex held + Status Run(); + + // Cleanup after successful/failed job + void Cleanup(const Status& status); + + VersionEdit* edit() { return &edit_; } + + const autovector& files_to_import() const { + return files_to_import_; + } + + private: + // Open the external file and populate `file_to_import` with all the + // external information we need to import this file. + Status GetIngestedFileInfo(const std::string& external_file, + IngestedFileInfo* file_to_import, + SuperVersion* sv); + + Env* env_; + VersionSet* versions_; + ColumnFamilyData* cfd_; + const ImmutableDBOptions& db_options_; + const EnvOptions& env_options_; + autovector files_to_import_; + VersionEdit edit_; + const ImportColumnFamilyOptions& import_options_; + std::vector metadata_; +}; + +} // namespace rocksdb diff --git a/db/import_column_family_test.cc b/db/import_column_family_test.cc new file mode 100644 index 000000000..a93ecbf11 --- /dev/null +++ b/db/import_column_family_test.cc @@ -0,0 +1,565 @@ +#ifndef ROCKSDB_LITE + +#include +#include "db/db_test_util.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "rocksdb/sst_file_writer.h" +#include "test_util/testutil.h" + +namespace rocksdb { + +class ImportColumnFamilyTest : public DBTestBase { + public: + ImportColumnFamilyTest() : DBTestBase("/import_column_family_test") { + sst_files_dir_ = dbname_ + "/sst_files/"; + DestroyAndRecreateExternalSSTFilesDir(); + export_files_dir_ = test::TmpDir(env_) + "/export"; + import_cfh_ = nullptr; + import_cfh2_ = nullptr; + metadata_ptr_ = nullptr; + } + + ~ImportColumnFamilyTest() { + if (import_cfh_) { + db_->DropColumnFamily(import_cfh_); + db_->DestroyColumnFamilyHandle(import_cfh_); + import_cfh_ = nullptr; + } + if (import_cfh2_) { + db_->DropColumnFamily(import_cfh2_); + db_->DestroyColumnFamilyHandle(import_cfh2_); + import_cfh2_ = nullptr; + } + if (metadata_ptr_) { + delete metadata_ptr_; + metadata_ptr_ = nullptr; + } + test::DestroyDir(env_, sst_files_dir_); + test::DestroyDir(env_, export_files_dir_); + } + + void DestroyAndRecreateExternalSSTFilesDir() { + test::DestroyDir(env_, sst_files_dir_); + env_->CreateDir(sst_files_dir_); + test::DestroyDir(env_, export_files_dir_); + } + + LiveFileMetaData LiveFileMetaDataInit(std::string name, + std::string path, + int level, + SequenceNumber smallest_seqno, + SequenceNumber largest_seqno) { + LiveFileMetaData metadata; + metadata.name = name; + metadata.db_path = path; + metadata.smallest_seqno = smallest_seqno; + metadata.largest_seqno = largest_seqno; + metadata.level = level; + return metadata; + } + + protected: + std::string sst_files_dir_; + std::string export_files_dir_; + ColumnFamilyHandle* import_cfh_; + ColumnFamilyHandle* import_cfh2_; + ExportImportFilesMetaData *metadata_ptr_; +}; + +TEST_F(ImportColumnFamilyTest, ImportSSTFileWriterFiles) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko"}, options); + + SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]); + SstFileWriter sfw_unknown(EnvOptions(), options); + + // cf1.sst + const std::string cf1_sst_name = "cf1.sst"; + const std::string cf1_sst = sst_files_dir_ + cf1_sst_name; + ASSERT_OK(sfw_cf1.Open(cf1_sst)); + ASSERT_OK(sfw_cf1.Put("K1", "V1")); + ASSERT_OK(sfw_cf1.Put("K2", "V2")); + ASSERT_OK(sfw_cf1.Finish()); + + // cf_unknown.sst + const std::string unknown_sst_name = "cf_unknown.sst"; + const std::string unknown_sst = sst_files_dir_ + unknown_sst_name; + ASSERT_OK(sfw_unknown.Open(unknown_sst)); + ASSERT_OK(sfw_unknown.Put("K3", "V1")); + ASSERT_OK(sfw_unknown.Put("K4", "V2")); + ASSERT_OK(sfw_unknown.Finish()); + + { + // Import sst file corresponding to cf1 onto a new cf and verify + ExportImportFilesMetaData metadata; + metadata.files.push_back( + LiveFileMetaDataInit(cf1_sst_name, sst_files_dir_, 0, 10, 19)); + metadata.db_comparator_name = options.comparator->Name(); + + ASSERT_OK(db_->CreateColumnFamilyWithImport( + options, "toto", ImportColumnFamilyOptions(), metadata, &import_cfh_)); + ASSERT_NE(import_cfh_, nullptr); + + std::string value; + db_->Get(ReadOptions(), import_cfh_, "K1", &value); + ASSERT_EQ(value, "V1"); + db_->Get(ReadOptions(), import_cfh_, "K2", &value); + ASSERT_EQ(value, "V2"); + ASSERT_OK(db_->DropColumnFamily(import_cfh_)); + ASSERT_OK(db_->DestroyColumnFamilyHandle(import_cfh_)); + import_cfh_ = nullptr; + } + + { + // Import sst file corresponding to unknown cf onto a new cf and verify + ExportImportFilesMetaData metadata; + metadata.files.push_back( + LiveFileMetaDataInit(unknown_sst_name, sst_files_dir_, 0, 20, 29)); + metadata.db_comparator_name = options.comparator->Name(); + + ASSERT_OK(db_->CreateColumnFamilyWithImport( + options, "yoyo", ImportColumnFamilyOptions(), metadata, &import_cfh_)); + ASSERT_NE(import_cfh_, nullptr); + + std::string value; + db_->Get(ReadOptions(), import_cfh_, "K3", &value); + ASSERT_EQ(value, "V1"); + db_->Get(ReadOptions(), import_cfh_, "K4", &value); + ASSERT_EQ(value, "V2"); + } +} + +TEST_F(ImportColumnFamilyTest, ImportSSTFileWriterFilesWithOverlap) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko"}, options); + + SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]); + + // file3.sst + const std::string file3_sst_name = "file3.sst"; + const std::string file3_sst = sst_files_dir_ + file3_sst_name; + ASSERT_OK(sfw_cf1.Open(file3_sst)); + for (int i = 0; i < 100; ++i) { + sfw_cf1.Put(Key(i), Key(i) + "_val"); + } + ASSERT_OK(sfw_cf1.Finish()); + + // file2.sst + const std::string file2_sst_name = "file2.sst"; + const std::string file2_sst = sst_files_dir_ + file2_sst_name; + ASSERT_OK(sfw_cf1.Open(file2_sst)); + for (int i = 0; i < 100; i += 2) { + sfw_cf1.Put(Key(i), Key(i) + "_overwrite1"); + } + ASSERT_OK(sfw_cf1.Finish()); + + // file1a.sst + const std::string file1a_sst_name = "file1a.sst"; + const std::string file1a_sst = sst_files_dir_ + file1a_sst_name; + ASSERT_OK(sfw_cf1.Open(file1a_sst)); + for (int i = 0; i < 52; i += 4) { + sfw_cf1.Put(Key(i), Key(i) + "_overwrite2"); + } + ASSERT_OK(sfw_cf1.Finish()); + + // file1b.sst + const std::string file1b_sst_name = "file1b.sst"; + const std::string file1b_sst = sst_files_dir_ + file1b_sst_name; + ASSERT_OK(sfw_cf1.Open(file1b_sst)); + for (int i = 52; i < 100; i += 4) { + sfw_cf1.Put(Key(i), Key(i) + "_overwrite2"); + } + ASSERT_OK(sfw_cf1.Finish()); + + // file0a.sst + const std::string file0a_sst_name = "file0a.sst"; + const std::string file0a_sst = sst_files_dir_ + file0a_sst_name; + ASSERT_OK(sfw_cf1.Open(file0a_sst)); + for (int i = 0; i < 100; i += 16) { + sfw_cf1.Put(Key(i), Key(i) + "_overwrite3"); + } + ASSERT_OK(sfw_cf1.Finish()); + + // file0b.sst + const std::string file0b_sst_name = "file0b.sst"; + const std::string file0b_sst = sst_files_dir_ + file0b_sst_name; + ASSERT_OK(sfw_cf1.Open(file0b_sst)); + for (int i = 0; i < 100; i += 16) { + sfw_cf1.Put(Key(i), Key(i) + "_overwrite4"); + } + ASSERT_OK(sfw_cf1.Finish()); + + // Import sst files and verify + ExportImportFilesMetaData metadata; + metadata.files.push_back( + LiveFileMetaDataInit(file3_sst_name, sst_files_dir_, 3, 10, 19)); + metadata.files.push_back( + LiveFileMetaDataInit(file2_sst_name, sst_files_dir_, 2, 20, 29)); + metadata.files.push_back( + LiveFileMetaDataInit(file1a_sst_name, sst_files_dir_, 1, 30, 34)); + metadata.files.push_back( + LiveFileMetaDataInit(file1b_sst_name, sst_files_dir_, 1, 35, 39)); + metadata.files.push_back( + LiveFileMetaDataInit(file0a_sst_name, sst_files_dir_, 0, 40, 49)); + metadata.files.push_back( + LiveFileMetaDataInit(file0b_sst_name, sst_files_dir_, 0, 50, 59)); + metadata.db_comparator_name = options.comparator->Name(); + + ASSERT_OK(db_->CreateColumnFamilyWithImport( + options, "toto", ImportColumnFamilyOptions(), metadata, &import_cfh_)); + ASSERT_NE(import_cfh_, nullptr); + + for (int i = 0; i < 100; i++) { + std::string value; + db_->Get(ReadOptions(), import_cfh_, Key(i), &value); + if (i % 16 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite4"); + } else if (i % 4 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite2"); + } else if (i % 2 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite1"); + } else { + ASSERT_EQ(value, Key(i) + "_val"); + } + } + + for (int i = 0; i < 100; i += 5) { + ASSERT_OK( + db_->Put(WriteOptions(), import_cfh_, Key(i), Key(i) + "_overwrite5")); + } + + // Flush and check again + ASSERT_OK(db_->Flush(FlushOptions(), import_cfh_)); + for (int i = 0; i < 100; i++) { + std::string value; + db_->Get(ReadOptions(), import_cfh_, Key(i), &value); + if (i % 5 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite5"); + } else if (i % 16 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite4"); + } else if (i % 4 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite2"); + } else if (i % 2 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite1"); + } else { + ASSERT_EQ(value, Key(i) + "_val"); + } + } + + // Compact and check again. + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), import_cfh_, nullptr, nullptr)); + for (int i = 0; i < 100; i++) { + std::string value; + db_->Get(ReadOptions(), import_cfh_, Key(i), &value); + if (i % 5 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite5"); + } else if (i % 16 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite4"); + } else if (i % 4 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite2"); + } else if (i % 2 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite1"); + } else { + ASSERT_EQ(value, Key(i) + "_val"); + } + } +} + +TEST_F(ImportColumnFamilyTest, ImportExportedSSTFromAnotherCF) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko"}, options); + + for (int i = 0; i < 100; ++i) { + Put(1, Key(i), Key(i) + "_val"); + } + ASSERT_OK(Flush(1)); + + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr)); + + // Overwrite the value in the same set of keys. + for (int i = 0; i < 100; ++i) { + Put(1, Key(i), Key(i) + "_overwrite"); + } + + // Flush to create L0 file. + ASSERT_OK(Flush(1)); + for (int i = 0; i < 100; ++i) { + Put(1, Key(i), Key(i) + "_overwrite2"); + } + + // Flush again to create another L0 file. It should have higher sequencer. + ASSERT_OK(Flush(1)); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->ExportColumnFamily(handles_[1], export_files_dir_, + &metadata_ptr_)); + ASSERT_NE(metadata_ptr_, nullptr); + + ImportColumnFamilyOptions import_options; + import_options.move_files = false; + ASSERT_OK(db_->CreateColumnFamilyWithImport(options, "toto", import_options, + *metadata_ptr_, &import_cfh_)); + ASSERT_NE(import_cfh_, nullptr); + + import_options.move_files = true; + ASSERT_OK(db_->CreateColumnFamilyWithImport(options, "yoyo", import_options, + *metadata_ptr_, &import_cfh2_)); + ASSERT_NE(import_cfh2_, nullptr); + delete metadata_ptr_; + metadata_ptr_ = NULL; + + std::string value1, value2; + + for (int i = 0; i < 100; ++i) { + db_->Get(ReadOptions(), import_cfh_, Key(i), &value1); + ASSERT_EQ(Get(1, Key(i)), value1); + } + + for (int i = 0; i < 100; ++i) { + db_->Get(ReadOptions(), import_cfh2_, Key(i), &value2); + ASSERT_EQ(Get(1, Key(i)), value2); + } + + // Modify keys in cf1 and verify. + for (int i = 0; i < 25; i++) { + ASSERT_OK(db_->Delete(WriteOptions(), import_cfh_, Key(i))); + } + for (int i = 25; i < 50; i++) { + ASSERT_OK( + db_->Put(WriteOptions(), import_cfh_, Key(i), Key(i) + "_overwrite3")); + } + for (int i = 0; i < 25; ++i) { + ASSERT_TRUE( + db_->Get(ReadOptions(), import_cfh_, Key(i), &value1).IsNotFound()); + } + for (int i = 25; i < 50; ++i) { + db_->Get(ReadOptions(), import_cfh_, Key(i), &value1); + ASSERT_EQ(Key(i) + "_overwrite3", value1); + } + for (int i = 50; i < 100; ++i) { + db_->Get(ReadOptions(), import_cfh_, Key(i), &value1); + ASSERT_EQ(Key(i) + "_overwrite2", value1); + } + + for (int i = 0; i < 100; ++i) { + db_->Get(ReadOptions(), import_cfh2_, Key(i), &value2); + ASSERT_EQ(Get(1, Key(i)), value2); + } + + // Compact and check again. + ASSERT_OK(db_->Flush(FlushOptions(), import_cfh_)); + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), import_cfh_, nullptr, nullptr)); + + for (int i = 0; i < 25; ++i) { + ASSERT_TRUE( + db_->Get(ReadOptions(), import_cfh_, Key(i), &value1).IsNotFound()); + } + for (int i = 25; i < 50; ++i) { + db_->Get(ReadOptions(), import_cfh_, Key(i), &value1); + ASSERT_EQ(Key(i) + "_overwrite3", value1); + } + for (int i = 50; i < 100; ++i) { + db_->Get(ReadOptions(), import_cfh_, Key(i), &value1); + ASSERT_EQ(Key(i) + "_overwrite2", value1); + } + + for (int i = 0; i < 100; ++i) { + db_->Get(ReadOptions(), import_cfh2_, Key(i), &value2); + ASSERT_EQ(Get(1, Key(i)), value2); + } +} + +TEST_F(ImportColumnFamilyTest, ImportExportedSSTFromAnotherDB) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko"}, options); + + for (int i = 0; i < 100; ++i) { + Put(1, Key(i), Key(i) + "_val"); + } + ASSERT_OK(Flush(1)); + + // Compact to create a L1 file. + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr)); + + // Overwrite the value in the same set of keys. + for (int i = 0; i < 50; ++i) { + Put(1, Key(i), Key(i) + "_overwrite"); + } + + // Flush to create L0 file. + ASSERT_OK(Flush(1)); + + for (int i = 0; i < 25; ++i) { + Put(1, Key(i), Key(i) + "_overwrite2"); + } + + // Flush again to create another L0 file. It should have higher sequencer. + ASSERT_OK(Flush(1)); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->ExportColumnFamily(handles_[1], export_files_dir_, + &metadata_ptr_)); + ASSERT_NE(metadata_ptr_, nullptr); + + // Create a new db and import the files. + DB* db_copy; + test::DestroyDir(env_, dbname_ + "/db_copy"); + ASSERT_OK(DB::Open(options, dbname_ + "/db_copy", &db_copy)); + ColumnFamilyHandle* cfh = nullptr; + ASSERT_OK(db_copy->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo", + ImportColumnFamilyOptions(), + *metadata_ptr_, &cfh)); + ASSERT_NE(cfh, nullptr); + + for (int i = 0; i < 100; ++i) { + std::string value; + db_copy->Get(ReadOptions(), cfh, Key(i), &value); + ASSERT_EQ(Get(1, Key(i)), value); + } + db_copy->DropColumnFamily(cfh); + test::DestroyDir(env_, dbname_ + "/db_copy"); +} + +TEST_F(ImportColumnFamilyTest, ImportColumnFamilyNegativeTest) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko"}, options); + + { + // Create column family with existing cf name. + ExportImportFilesMetaData metadata; + + ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "koko", + ImportColumnFamilyOptions(), + metadata, &import_cfh_), + Status::InvalidArgument("Column family already exists")); + ASSERT_EQ(import_cfh_, nullptr); + } + + { + // Import with no files specified. + ExportImportFilesMetaData metadata; + + ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo", + ImportColumnFamilyOptions(), + metadata, &import_cfh_), + Status::InvalidArgument("The list of files is empty")); + ASSERT_EQ(import_cfh_, nullptr); + } + + { + // Import with overlapping keys in sst files. + ExportImportFilesMetaData metadata; + SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]); + const std::string file1_sst_name = "file1.sst"; + const std::string file1_sst = sst_files_dir_ + file1_sst_name; + ASSERT_OK(sfw_cf1.Open(file1_sst)); + ASSERT_OK(sfw_cf1.Put("K1", "V1")); + ASSERT_OK(sfw_cf1.Put("K2", "V2")); + ASSERT_OK(sfw_cf1.Finish()); + const std::string file2_sst_name = "file2.sst"; + const std::string file2_sst = sst_files_dir_ + file2_sst_name; + ASSERT_OK(sfw_cf1.Open(file2_sst)); + ASSERT_OK(sfw_cf1.Put("K2", "V2")); + ASSERT_OK(sfw_cf1.Put("K3", "V3")); + ASSERT_OK(sfw_cf1.Finish()); + + metadata.files.push_back( + LiveFileMetaDataInit(file1_sst_name, sst_files_dir_, 1, 10, 19)); + metadata.files.push_back( + LiveFileMetaDataInit(file2_sst_name, sst_files_dir_, 1, 10, 19)); + metadata.db_comparator_name = options.comparator->Name(); + + ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo", + ImportColumnFamilyOptions(), + metadata, &import_cfh_), + Status::InvalidArgument("Files have overlapping ranges")); + ASSERT_EQ(import_cfh_, nullptr); + } + + { + // Import with a mismatching comparator, should fail with appropriate error. + ExportImportFilesMetaData metadata; + Options mismatch_options = CurrentOptions(); + mismatch_options.comparator = ReverseBytewiseComparator(); + SstFileWriter sfw_cf1(EnvOptions(), mismatch_options, handles_[1]); + const std::string file1_sst_name = "file1.sst"; + const std::string file1_sst = sst_files_dir_ + file1_sst_name; + ASSERT_OK(sfw_cf1.Open(file1_sst)); + ASSERT_OK(sfw_cf1.Put("K2", "V2")); + ASSERT_OK(sfw_cf1.Put("K1", "V1")); + ASSERT_OK(sfw_cf1.Finish()); + + metadata.files.push_back( + LiveFileMetaDataInit(file1_sst_name, sst_files_dir_, 1, 10, 19)); + metadata.db_comparator_name = mismatch_options.comparator->Name(); + + ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "coco", + ImportColumnFamilyOptions(), + metadata, &import_cfh_), + Status::InvalidArgument("Comparator name mismatch")); + ASSERT_EQ(import_cfh_, nullptr); + } + + { + // Import with non existent sst file should fail with appropriate error + ExportImportFilesMetaData metadata; + SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]); + const std::string file1_sst_name = "file1.sst"; + const std::string file1_sst = sst_files_dir_ + file1_sst_name; + ASSERT_OK(sfw_cf1.Open(file1_sst)); + ASSERT_OK(sfw_cf1.Put("K1", "V1")); + ASSERT_OK(sfw_cf1.Put("K2", "V2")); + ASSERT_OK(sfw_cf1.Finish()); + const std::string file3_sst_name = "file3.sst"; + + metadata.files.push_back( + LiveFileMetaDataInit(file1_sst_name, sst_files_dir_, 1, 10, 19)); + metadata.files.push_back( + LiveFileMetaDataInit(file3_sst_name, sst_files_dir_, 1, 10, 19)); + metadata.db_comparator_name = options.comparator->Name(); + + ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo", + ImportColumnFamilyOptions(), + metadata, &import_cfh_), + Status::IOError("No such file or directory")); + ASSERT_EQ(import_cfh_, nullptr); + + // Test successful import after a failure with the same CF name. Ensures + // there is no side effect with CF when there is a failed import + metadata.files.pop_back(); + metadata.db_comparator_name = options.comparator->Name(); + + ASSERT_OK(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo", + ImportColumnFamilyOptions(), + metadata, &import_cfh_)); + ASSERT_NE(import_cfh_, nullptr); + } + +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include + +int main(int argc, char** argv) { + fprintf(stderr, + "SKIPPED as External SST File Writer and Import are not supported " + "in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !ROCKSDB_LITE diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 0f8573e43..d90ca900f 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1174,6 +1174,27 @@ class DB { virtual Status IngestExternalFiles( const std::vector& args) = 0; + // CreateColumnFamilyWithImport() will create a new column family with + // column_family_name and import external SST files specified in metadata into + // this column family. + // (1) External SST files can be created using SstFileWriter. + // (2) External SST files can be exported from a particular column family in + // an existing DB. + // Option in import_options specifies whether the external files are copied or + // moved (default is copy). When option specifies copy, managing files at + // external_file_path is caller's responsibility. When option specifies a + // move, the call ensures that the specified files at external_file_path are + // deleted on successful return and files are not modified on any error + // return. + // On error return, column family handle returned will be nullptr. + // ColumnFamily will be present on successful return and will not be present + // on error return. ColumnFamily may be present on any crash during this call. + virtual Status CreateColumnFamilyWithImport( + const ColumnFamilyOptions& options, const std::string& column_family_name, + const ImportColumnFamilyOptions& import_options, + const ExportImportFilesMetaData& metadata, + ColumnFamilyHandle** handle) = 0; + virtual Status VerifyChecksum() = 0; // AddFile() is deprecated, please use IngestExternalFile() diff --git a/include/rocksdb/metadata.h b/include/rocksdb/metadata.h index a0ab41efd..7b251eb72 100644 --- a/include/rocksdb/metadata.h +++ b/include/rocksdb/metadata.h @@ -108,4 +108,11 @@ struct LiveFileMetaData : SstFileMetaData { int level; // Level at which this file resides. LiveFileMetaData() : column_family_name(), level(0) {} }; + +// Metadata returned as output from ExportColumnFamily() and used as input to +// CreateColumnFamiliesWithImport(). +struct ExportImportFilesMetaData { + std::string db_comparator_name; // Used to safety check at import. + std::vector files; // Vector of file metadata. +}; } // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 8ebcd292d..09dc8e54c 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1491,4 +1491,10 @@ struct TraceOptions { uint64_t filter = kTraceFilterNone; }; +// ImportColumnFamilyOptions is used by ImportColumnFamily() +struct ImportColumnFamilyOptions { + // Can be set to true to move the files instead of copying them. + bool move_files = false; +}; + } // namespace rocksdb diff --git a/include/rocksdb/utilities/checkpoint.h b/include/rocksdb/utilities/checkpoint.h index aa0a394d4..5f12922c4 100644 --- a/include/rocksdb/utilities/checkpoint.h +++ b/include/rocksdb/utilities/checkpoint.h @@ -9,11 +9,15 @@ #ifndef ROCKSDB_LITE #include +#include #include "rocksdb/status.h" namespace rocksdb { class DB; +class ColumnFamilyHandle; +struct LiveFileMetaData; +struct ExportImportFilesMetaData; class Checkpoint { public: @@ -36,6 +40,16 @@ class Checkpoint { virtual Status CreateCheckpoint(const std::string& checkpoint_dir, uint64_t log_size_for_flush = 0); + // Exports all live SST files of a specified Column Family onto export_dir, + // returning SST files information in metadata. + // - SST files will be created as hard links when the directory specified + // is in the same partition as the db directory, copied otherwise. + // - export_dir should not already exist and will be created by this API. + // - Always triggers a flush. + virtual Status ExportColumnFamily(ColumnFamilyHandle* handle, + const std::string& export_dir, + ExportImportFilesMetaData** metadata); + virtual ~Checkpoint() {} }; diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 8535952cd..a52aff5d8 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -120,6 +120,16 @@ class StackableDB : public DB { return db_->IngestExternalFiles(args); } + using DB::CreateColumnFamilyWithImport; + virtual Status CreateColumnFamilyWithImport( + const ColumnFamilyOptions& options, const std::string& column_family_name, + const ImportColumnFamilyOptions& import_options, + const ExportImportFilesMetaData& metadata, + ColumnFamilyHandle** handle) override { + return db_->CreateColumnFamilyWithImport(options, column_family_name, + import_options, metadata, handle); + } + virtual Status VerifyChecksum() override { return db_->VerifyChecksum(); } using DB::KeyMayExist; diff --git a/src.mk b/src.mk index fe930d5f4..4d635173b 100644 --- a/src.mk +++ b/src.mk @@ -36,6 +36,7 @@ LIB_SOURCES = \ db/flush_job.cc \ db/flush_scheduler.cc \ db/forward_iterator.cc \ + db/import_column_family_job.cc \ db/internal_stats.cc \ db/logs_with_prep_tracker.cc \ db/log_reader.cc \ diff --git a/utilities/checkpoint/checkpoint_impl.cc b/utilities/checkpoint/checkpoint_impl.cc index 4835f26da..0639ed2f2 100644 --- a/utilities/checkpoint/checkpoint_impl.cc +++ b/utilities/checkpoint/checkpoint_impl.cc @@ -22,6 +22,7 @@ #include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" +#include "rocksdb/metadata.h" #include "rocksdb/transaction_log.h" #include "rocksdb/utilities/checkpoint.h" #include "test_util/sync_point.h" @@ -60,6 +61,12 @@ void CheckpointImpl::CleanStagingDirectory( full_private_path.c_str(), s.ToString().c_str()); } +Status Checkpoint::ExportColumnFamily( + ColumnFamilyHandle* /*handle*/, const std::string& /*export_dir*/, + ExportImportFilesMetaData** /*metadata*/) { + return Status::NotSupported(""); +} + // Builds an openable snapshot of RocksDB Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, uint64_t log_size_for_flush) { @@ -322,6 +329,184 @@ Status CheckpointImpl::CreateCustomCheckpoint( return s; } +// Exports all live SST files of a specified Column Family onto export_dir, +// returning SST files information in metadata. +Status CheckpointImpl::ExportColumnFamily( + ColumnFamilyHandle* handle, const std::string& export_dir, + ExportImportFilesMetaData** metadata) { + auto cfh = reinterpret_cast(handle); + const auto cf_name = cfh->GetName(); + const auto db_options = db_->GetDBOptions(); + + assert(metadata != nullptr); + assert(*metadata == nullptr); + auto s = db_->GetEnv()->FileExists(export_dir); + if (s.ok()) { + return Status::InvalidArgument("Specified export_dir exists"); + } else if (!s.IsNotFound()) { + assert(s.IsIOError()); + return s; + } + + const auto final_nonslash_idx = export_dir.find_last_not_of('/'); + if (final_nonslash_idx == std::string::npos) { + return Status::InvalidArgument("Specified export_dir invalid"); + } + ROCKS_LOG_INFO(db_options.info_log, + "[%s] export column family onto export directory %s", + cf_name.c_str(), export_dir.c_str()); + + // Create a temporary export directory. + const auto tmp_export_dir = + export_dir.substr(0, final_nonslash_idx + 1) + ".tmp"; + s = db_->GetEnv()->CreateDir(tmp_export_dir); + + if (s.ok()) { + s = db_->Flush(rocksdb::FlushOptions(), handle); + } + + ColumnFamilyMetaData db_metadata; + if (s.ok()) { + // Export live sst files with file deletions disabled. + s = db_->DisableFileDeletions(); + if (s.ok()) { + db_->GetColumnFamilyMetaData(handle, &db_metadata); + + s = ExportFilesInMetaData( + db_options, db_metadata, + [&](const std::string& src_dirname, const std::string& fname) { + ROCKS_LOG_INFO(db_options.info_log, "[%s] HardLinking %s", + cf_name.c_str(), fname.c_str()); + return db_->GetEnv()->LinkFile(src_dirname + fname, + tmp_export_dir + fname); + } /*link_file_cb*/, + [&](const std::string& src_dirname, const std::string& fname) { + ROCKS_LOG_INFO(db_options.info_log, "[%s] Copying %s", + cf_name.c_str(), fname.c_str()); + return CopyFile(db_->GetEnv(), src_dirname + fname, + tmp_export_dir + fname, 0, db_options.use_fsync); + } /*copy_file_cb*/); + + const auto enable_status = db_->EnableFileDeletions(false /*force*/); + if (s.ok()) { + s = enable_status; + } + } + } + + auto moved_to_user_specified_dir = false; + if (s.ok()) { + // Move temporary export directory to the actual export directory. + s = db_->GetEnv()->RenameFile(tmp_export_dir, export_dir); + } + + if (s.ok()) { + // Fsync export directory. + moved_to_user_specified_dir = true; + std::unique_ptr dir_ptr; + s = db_->GetEnv()->NewDirectory(export_dir, &dir_ptr); + if (s.ok()) { + assert(dir_ptr != nullptr); + s = dir_ptr->Fsync(); + } + } + + if (s.ok()) { + // Export of files succeeded. Fill in the metadata information. + auto result_metadata = new ExportImportFilesMetaData(); + result_metadata->db_comparator_name = handle->GetComparator()->Name(); + for (const auto& level_metadata : db_metadata.levels) { + for (const auto& file_metadata : level_metadata.files) { + LiveFileMetaData live_file_metadata; + live_file_metadata.size = file_metadata.size; + live_file_metadata.name = std::move(file_metadata.name); + live_file_metadata.db_path = export_dir; + live_file_metadata.smallest_seqno = file_metadata.smallest_seqno; + live_file_metadata.largest_seqno = file_metadata.largest_seqno; + live_file_metadata.smallestkey = std::move(file_metadata.smallestkey); + live_file_metadata.largestkey = std::move(file_metadata.largestkey); + live_file_metadata.level = level_metadata.level; + result_metadata->files.push_back(live_file_metadata); + } + *metadata = result_metadata; + } + ROCKS_LOG_INFO(db_options.info_log, "[%s] Export succeeded.", + cf_name.c_str()); + } else { + // Failure: Clean up all the files/directories created. + ROCKS_LOG_INFO(db_options.info_log, "[%s] Export failed. %s", + cf_name.c_str(), s.ToString().c_str()); + std::vector subchildren; + const auto cleanup_dir = + moved_to_user_specified_dir ? export_dir : tmp_export_dir; + db_->GetEnv()->GetChildren(cleanup_dir, &subchildren); + for (const auto& subchild : subchildren) { + const auto subchild_path = cleanup_dir + "/" + subchild; + const auto status = db_->GetEnv()->DeleteFile(subchild_path); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup file %s: %s", + subchild_path.c_str(), status.ToString().c_str()); + } + } + const auto status = db_->GetEnv()->DeleteDir(cleanup_dir); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup dir %s: %s", + cleanup_dir.c_str(), status.ToString().c_str()); + } + } + return s; +} + +Status CheckpointImpl::ExportFilesInMetaData( + const DBOptions& db_options, const ColumnFamilyMetaData& metadata, + std::function + link_file_cb, + std::function + copy_file_cb) { + Status s; + auto hardlink_file = true; + + // Copy/hard link files in metadata. + size_t num_files = 0; + for (const auto& level_metadata : metadata.levels) { + for (const auto& file_metadata : level_metadata.files) { + uint64_t number; + FileType type; + const auto ok = ParseFileName(file_metadata.name, &number, &type); + if (!ok) { + s = Status::Corruption("Could not parse file name"); + break; + } + + // We should only get sst files here. + assert(type == kTableFile); + assert(file_metadata.size > 0 && file_metadata.name[0] == '/'); + const auto src_fname = file_metadata.name; + ++num_files; + + if (hardlink_file) { + s = link_file_cb(db_->GetName(), src_fname); + if (num_files == 1 && s.IsNotSupported()) { + // Fallback to copy if link failed due to cross-device directories. + hardlink_file = false; + s = Status::OK(); + } + } + if (!hardlink_file) { + s = copy_file_cb(db_->GetName(), src_fname); + } + if (!s.ok()) { + break; + } + } + } + ROCKS_LOG_INFO(db_options.info_log, "Number of table files %" ROCKSDB_PRIszt, + num_files); + + return s; +} } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/checkpoint/checkpoint_impl.h b/utilities/checkpoint/checkpoint_impl.h index d26a9f66b..0d87b635b 100644 --- a/utilities/checkpoint/checkpoint_impl.h +++ b/utilities/checkpoint/checkpoint_impl.h @@ -30,6 +30,17 @@ class CheckpointImpl : public Checkpoint { virtual Status CreateCheckpoint(const std::string& checkpoint_dir, uint64_t log_size_for_flush) override; + // Exports all live SST files of a specified Column Family onto export_dir + // and returning SST files information in metadata. + // - SST files will be created as hard links when the directory specified + // is in the same partition as the db directory, copied otherwise. + // - export_dir should not already exist and will be created by this API. + // - Always triggers a flush. + using Checkpoint::ExportColumnFamily; + virtual Status ExportColumnFamily( + ColumnFamilyHandle* handle, const std::string& export_dir, + ExportImportFilesMetaData** metadata) override; + // Checkpoint logic can be customized by providing callbacks for link, copy, // or create. Status CreateCustomCheckpoint( @@ -48,6 +59,18 @@ class CheckpointImpl : public Checkpoint { private: void CleanStagingDirectory(const std::string& path, Logger* info_log); + + // Export logic customization by providing callbacks for link or copy. + Status ExportFilesInMetaData( + const DBOptions& db_options, const ColumnFamilyMetaData& metadata, + std::function + link_file_cb, + std::function + copy_file_cb); + + private: DB* db_; }; diff --git a/utilities/checkpoint/checkpoint_test.cc b/utilities/checkpoint/checkpoint_test.cc index d7d2548af..d748f500e 100644 --- a/utilities/checkpoint/checkpoint_test.cc +++ b/utilities/checkpoint/checkpoint_test.cc @@ -26,6 +26,7 @@ #include "test_util/fault_injection_test_env.h" #include "test_util/sync_point.h" #include "test_util/testharness.h" +#include "test_util/testutil.h" namespace rocksdb { class CheckpointTest : public testing::Test { @@ -44,6 +45,9 @@ class CheckpointTest : public testing::Test { Options last_options_; std::vector handles_; std::string snapshot_name_; + std::string export_path_; + ColumnFamilyHandle* cfh_reverse_comp_; + ExportImportFilesMetaData* metadata_; CheckpointTest() : env_(Env::Default()) { env_->SetBackgroundThreads(1, Env::LOW); @@ -64,12 +68,24 @@ class CheckpointTest : public testing::Test { EXPECT_OK(DestroyDB(snapshot_tmp_name, options)); env_->DeleteDir(snapshot_tmp_name); Reopen(options); + export_path_ = test::TmpDir(env_) + "/export"; + test::DestroyDir(env_, export_path_); + cfh_reverse_comp_ = nullptr; + metadata_ = nullptr; } ~CheckpointTest() override { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->LoadDependency({}); rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + if (cfh_reverse_comp_) { + EXPECT_OK(db_->DestroyColumnFamilyHandle(cfh_reverse_comp_)); + cfh_reverse_comp_ = nullptr; + } + if (metadata_) { + delete metadata_; + metadata_ = nullptr; + } Close(); Options options; options.db_paths.emplace_back(dbname_, 0); @@ -78,6 +94,7 @@ class CheckpointTest : public testing::Test { options.db_paths.emplace_back(dbname_ + "_4", 0); EXPECT_OK(DestroyDB(dbname_, options)); EXPECT_OK(DestroyDB(snapshot_name_, options)); + test::DestroyDir(env_, export_path_); } // Return the current option configuration. @@ -140,6 +157,12 @@ class CheckpointTest : public testing::Test { ASSERT_OK(TryReopen(options)); } + void CompactAll() { + for (auto h : handles_) { + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), h, nullptr, nullptr)); + } + } + void Close() { for (auto h : handles_) { delete h; @@ -289,6 +312,109 @@ TEST_F(CheckpointTest, GetSnapshotLink) { } } +TEST_F(CheckpointTest, ExportColumnFamilyWithLinks) { + // Create a database + Status s; + auto options = CurrentOptions(); + options.create_if_missing = true; + CreateAndReopenWithCF({}, options); + + // Helper to verify the number of files in metadata and export dir + auto verify_files_exported = [&](const ExportImportFilesMetaData& metadata, + int num_files_expected) { + ASSERT_EQ(metadata.files.size(), num_files_expected); + std::vector subchildren; + env_->GetChildren(export_path_, &subchildren); + int num_children = 0; + for (const auto& child : subchildren) { + if (child != "." && child != "..") { + ++num_children; + } + } + ASSERT_EQ(num_children, num_files_expected); + }; + + // Test DefaultColumnFamily + { + const auto key = std::string("foo"); + ASSERT_OK(Put(key, "v1")); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + // Export the Tables and verify + ASSERT_OK(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_)); + verify_files_exported(*metadata_, 1); + ASSERT_EQ(metadata_->db_comparator_name, options.comparator->Name()); + test::DestroyDir(env_, export_path_); + delete metadata_; + metadata_ = nullptr; + + // Check again after compaction + CompactAll(); + ASSERT_OK(Put(key, "v2")); + ASSERT_OK(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_)); + verify_files_exported(*metadata_, 2); + ASSERT_EQ(metadata_->db_comparator_name, options.comparator->Name()); + test::DestroyDir(env_, export_path_); + delete metadata_; + metadata_ = nullptr; + delete checkpoint; + } + + // Test non default column family with non default comparator + { + auto cf_options = CurrentOptions(); + cf_options.comparator = ReverseBytewiseComparator(); + ASSERT_OK( + db_->CreateColumnFamily(cf_options, "yoyo", &cfh_reverse_comp_)); + + const auto key = std::string("foo"); + ASSERT_OK(db_->Put(WriteOptions(), cfh_reverse_comp_, key, "v1")); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + // Export the Tables and verify + ASSERT_OK(checkpoint->ExportColumnFamily(cfh_reverse_comp_, export_path_, + &metadata_)); + verify_files_exported(*metadata_, 1); + ASSERT_EQ(metadata_->db_comparator_name, + ReverseBytewiseComparator()->Name()); + delete checkpoint; + } +} + +TEST_F(CheckpointTest, ExportColumnFamilyNegativeTest) { + // Create a database + Status s; + auto options = CurrentOptions(); + options.create_if_missing = true; + CreateAndReopenWithCF({}, options); + + const auto key = std::string("foo"); + ASSERT_OK(Put(key, "v1")); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + // Export onto existing directory + env_->CreateDirIfMissing(export_path_); + ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_), + Status::InvalidArgument("Specified export_dir exists")); + test::DestroyDir(env_, export_path_); + + // Export with invalid directory specification + export_path_ = ""; + ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_), + Status::InvalidArgument("Specified export_dir invalid")); + delete checkpoint; +} + TEST_F(CheckpointTest, CheckpointCF) { Options options = CurrentOptions(); CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options);