From 22ce4624509694b8c35a15ef1fc49d3013f05a96 Mon Sep 17 00:00:00 2001 From: Venki Pallipadi Date: Wed, 17 Jul 2019 12:22:21 -0700 Subject: [PATCH] Export Import sst files (#5495) Summary: Refresh of the earlier change here - https://github.com/facebook/rocksdb/issues/5135 This is a review request for code change needed for - https://github.com/facebook/rocksdb/issues/3469 "Add support for taking snapshot of a column family and creating column family from a given CF snapshot" We have an implementation for this that we have been testing internally. We have two new APIs that together provide this functionality. (1) ExportColumnFamily() - This API is modelled after CreateCheckpoint() as below. // Exports all live SST files of a specified Column Family onto export_dir, // returning SST files information in metadata. // - SST files will be created as hard links when the directory specified // is in the same partition as the db directory, copied otherwise. // - export_dir should not already exist and will be created by this API. // - Always triggers a flush. virtual Status ExportColumnFamily(ColumnFamilyHandle* handle, const std::string& export_dir, ExportImportFilesMetaData** metadata); Internally, the API will DisableFileDeletions(), GetColumnFamilyMetaData(), Parse through metadata, creating links/copies of all the sst files, EnableFileDeletions() and complete the call by returning the list of file metadata. (2) CreateColumnFamilyWithImport() - This API is modeled after IngestExternalFile(), but invoked only during a CF creation as below. // CreateColumnFamilyWithImport() will create a new column family with // column_family_name and import external SST files specified in metadata into // this column family. // (1) External SST files can be created using SstFileWriter. // (2) External SST files can be exported from a particular column family in // an existing DB. // Option in import_options specifies whether the external files are copied or // moved (default is copy). When option specifies copy, managing files at // external_file_path is caller's responsibility. When option specifies a // move, the call ensures that the specified files at external_file_path are // deleted on successful return and files are not modified on any error // return. // On error return, column family handle returned will be nullptr. // ColumnFamily will be present on successful return and will not be present // on error return. ColumnFamily may be present on any crash during this call. virtual Status CreateColumnFamilyWithImport( const ColumnFamilyOptions& options, const std::string& column_family_name, const ImportColumnFamilyOptions& import_options, const ExportImportFilesMetaData& metadata, ColumnFamilyHandle** handle); Internally, this API creates a new CF, parses all the sst files and adds it to the specified column family, at the same level and with same sequence number as in the metadata. Also performs safety checks with respect to overlaps between the sst files being imported. If incoming sequence number is higher than current local sequence number, local sequence number is updated to reflect this. Note, as the sst files is are being moved across Column Families, Column Family name in sst file will no longer match the actual column family on destination DB. The API does not modify Column Family name or id in the sst files being imported. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5495 Differential Revision: D16018881 fbshipit-source-id: 9ae2251025d5916d35a9fc4ea4d6707f6be16ff9 --- CMakeLists.txt | 1 + Makefile | 5 + TARGETS | 1 + db/compacted_db_impl.h | 9 + db/db_impl/db_impl.cc | 122 +++++ db/db_impl/db_impl.h | 11 +- db/db_impl/db_impl_readonly.h | 10 + db/db_test.cc | 10 + db/import_column_family_job.cc | 257 +++++++++++ db/import_column_family_job.h | 70 +++ db/import_column_family_test.cc | 565 +++++++++++++++++++++++ include/rocksdb/db.h | 21 + include/rocksdb/metadata.h | 7 + include/rocksdb/options.h | 6 + include/rocksdb/utilities/checkpoint.h | 14 + include/rocksdb/utilities/stackable_db.h | 10 + src.mk | 1 + utilities/checkpoint/checkpoint_impl.cc | 185 ++++++++ utilities/checkpoint/checkpoint_impl.h | 23 + utilities/checkpoint/checkpoint_test.cc | 126 +++++ 20 files changed, 1453 insertions(+), 1 deletion(-) create mode 100644 db/import_column_family_job.cc create mode 100644 db/import_column_family_job.h create mode 100644 db/import_column_family_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 65904b8ca..b49a13572 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -520,6 +520,7 @@ set(SOURCES db/flush_job.cc db/flush_scheduler.cc db/forward_iterator.cc + db/import_column_family_job.cc db/internal_stats.cc db/logs_with_prep_tracker.cc db/log_reader.cc diff --git a/Makefile b/Makefile index 100f160ca..f8a904bd3 100644 --- a/Makefile +++ b/Makefile @@ -500,6 +500,7 @@ TESTS = \ plain_table_db_test \ comparator_db_test \ external_sst_file_test \ + import_column_family_test \ prefix_test \ skiplist_test \ write_buffer_manager_test \ @@ -577,6 +578,7 @@ PARALLEL_TEST = \ db_universal_compaction_test \ db_wal_test \ external_sst_file_test \ + import_column_family_test \ fault_injection_test \ inlineskiplist_test \ manual_compaction_test \ @@ -1274,6 +1276,9 @@ external_sst_file_basic_test: db/external_sst_file_basic_test.o db/db_test_util. external_sst_file_test: db/external_sst_file_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +import_column_family_test: db/import_column_family_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_tailing_iter_test: db/db_tailing_iter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index eda105139..cfd9ef73d 100644 --- a/TARGETS +++ b/TARGETS @@ -113,6 +113,7 @@ cpp_library( "db/flush_job.cc", "db/flush_scheduler.cc", "db/forward_iterator.cc", + "db/import_column_family_job.cc", "db/internal_stats.cc", "db/log_reader.cc", "db/log_writer.cc", diff --git a/db/compacted_db_impl.h b/db/compacted_db_impl.h index c1b8da9a7..e71ce2494 100644 --- a/db/compacted_db_impl.h +++ b/db/compacted_db_impl.h @@ -85,6 +85,15 @@ class CompactedDBImpl : public DBImpl { const IngestExternalFileOptions& /*ingestion_options*/) override { return Status::NotSupported("Not supported in compacted db mode."); } + using DB::CreateColumnFamilyWithImport; + virtual Status CreateColumnFamilyWithImport( + const ColumnFamilyOptions& /*options*/, + const std::string& /*column_family_name*/, + const ImportColumnFamilyOptions& /*import_options*/, + const ExportImportFilesMetaData& /*metadata*/, + ColumnFamilyHandle** /*handle*/) override { + return Status::NotSupported("Not supported in compacted db mode."); + } private: friend class DB; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 6f2ebdc80..af9aea011 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -33,6 +33,7 @@ #include "db/error_handler.h" #include "db/event_helpers.h" #include "db/external_sst_file_ingestion_job.h" +#include "db/import_column_family_job.h" #include "db/flush_job.h" #include "db/forward_iterator.h" #include "db/job_context.h" @@ -3894,6 +3895,127 @@ Status DBImpl::IngestExternalFiles( return status; } +Status DBImpl::CreateColumnFamilyWithImport( + const ColumnFamilyOptions& options, const std::string& column_family_name, + const ImportColumnFamilyOptions& import_options, + const ExportImportFilesMetaData& metadata, + ColumnFamilyHandle** handle) { + assert(handle != nullptr); + assert(*handle == nullptr); + std::string cf_comparator_name = options.comparator->Name(); + if (cf_comparator_name != metadata.db_comparator_name) { + return Status::InvalidArgument("Comparator name mismatch"); + } + + // Create column family. + auto status = CreateColumnFamily(options, column_family_name, handle); + if (!status.ok()) { + return status; + } + + // Import sst files from metadata. + auto cfh = reinterpret_cast(*handle); + auto cfd = cfh->cfd(); + ImportColumnFamilyJob import_job(env_, versions_.get(), cfd, + immutable_db_options_, env_options_, + import_options, metadata.files); + + SuperVersionContext dummy_sv_ctx(/* create_superversion */ true); + VersionEdit dummy_edit; + uint64_t next_file_number = 0; + std::list::iterator pending_output_elem; + { + // Lock db mutex + InstrumentedMutexLock l(&mutex_); + if (error_handler_.IsDBStopped()) { + // Don't import files when there is a bg_error + status = error_handler_.GetBGError(); + } + + // Make sure that bg cleanup wont delete the files that we are importing + pending_output_elem = CaptureCurrentFileNumberInPendingOutputs(); + + if (status.ok()) { + // If crash happen after a hard link established, Recover function may + // reuse the file number that has already assigned to the internal file, + // and this will overwrite the external file. To protect the external + // file, we have to make sure the file number will never being reused. + next_file_number = + versions_->FetchAddFileNumber(metadata.files.size()); + auto cf_options = cfd->GetLatestMutableCFOptions(); + status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_, + directories_.GetDbDir()); + if (status.ok()) { + InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options); + } + } + } + dummy_sv_ctx.Clean(); + + if (status.ok()) { + SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); + status = import_job.Prepare(next_file_number, sv); + CleanupSuperVersion(sv); + } + + if (status.ok()) { + SuperVersionContext sv_context(true /*create_superversion*/); + { + // Lock db mutex + InstrumentedMutexLock l(&mutex_); + + // Stop writes to the DB by entering both write threads + WriteThread::Writer w; + write_thread_.EnterUnbatched(&w, &mutex_); + WriteThread::Writer nonmem_w; + if (two_write_queues_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } + + num_running_ingest_file_++; + assert(!cfd->IsDropped()); + status = import_job.Run(); + + // Install job edit [Mutex will be unlocked here] + if (status.ok()) { + auto cf_options = cfd->GetLatestMutableCFOptions(); + status = versions_->LogAndApply(cfd, *cf_options, import_job.edit(), + &mutex_, directories_.GetDbDir()); + if (status.ok()) { + InstallSuperVersionAndScheduleWork(cfd, &sv_context, *cf_options); + } + } + + // Resume writes to the DB + if (two_write_queues_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } + write_thread_.ExitUnbatched(&w); + + num_running_ingest_file_--; + if (num_running_ingest_file_ == 0) { + bg_cv_.SignalAll(); + } + } + // mutex_ is unlocked here + + sv_context.Clean(); + } + + { + InstrumentedMutexLock l(&mutex_); + ReleaseFileNumberFromPendingOutputs(pending_output_elem); + } + + import_job.Cleanup(status); + if (!status.ok()) { + DropColumnFamily(*handle); + DestroyColumnFamilyHandle(*handle); + *handle = nullptr; + } + return status; +} + Status DBImpl::VerifyChecksum() { Status s; std::vector cfd_list; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index d417035b1..547e3e1d6 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -27,6 +27,7 @@ #include "db/external_sst_file_ingestion_job.h" #include "db/flush_job.h" #include "db/flush_scheduler.h" +#include "db/import_column_family_job.h" #include "db/internal_stats.h" #include "db/log_writer.h" #include "db/logs_with_prep_tracker.h" @@ -356,6 +357,13 @@ class DBImpl : public DB { virtual Status IngestExternalFiles( const std::vector& args) override; + using DB::CreateColumnFamilyWithImport; + virtual Status CreateColumnFamilyWithImport( + const ColumnFamilyOptions& options, const std::string& column_family_name, + const ImportColumnFamilyOptions& import_options, + const ExportImportFilesMetaData& metadata, + ColumnFamilyHandle** handle) override; + virtual Status VerifyChecksum() override; using DB::StartTrace; @@ -1803,7 +1811,8 @@ class DBImpl : public DB { std::string db_absolute_path_; - // Number of running IngestExternalFile() calls. + // Number of running IngestExternalFile() or CreateColumnFamilyWithImport() + // calls. // REQUIRES: mutex held int num_running_ingest_file_; diff --git a/db/db_impl/db_impl_readonly.h b/db/db_impl/db_impl_readonly.h index 18df900cb..ad307677c 100644 --- a/db/db_impl/db_impl_readonly.h +++ b/db/db_impl/db_impl_readonly.h @@ -115,6 +115,16 @@ class DBImplReadOnly : public DBImpl { return Status::NotSupported("Not supported operation in read only mode."); } + using DB::CreateColumnFamilyWithImport; + virtual Status CreateColumnFamilyWithImport( + const ColumnFamilyOptions& /*options*/, + const std::string& /*column_family_name*/, + const ImportColumnFamilyOptions& /*import_options*/, + const ExportImportFilesMetaData& /*metadata*/, + ColumnFamilyHandle** /*handle*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + private: friend class DB; diff --git a/db/db_test.cc b/db/db_test.cc index 69e91923c..36bdda59e 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2492,6 +2492,16 @@ class ModelDB : public DB { return Status::NotSupported("Not implemented"); } + using DB::CreateColumnFamilyWithImport; + virtual Status CreateColumnFamilyWithImport( + const ColumnFamilyOptions& /*options*/, + const std::string& /*column_family_name*/, + const ImportColumnFamilyOptions& /*import_options*/, + const ExportImportFilesMetaData& /*metadata*/, + ColumnFamilyHandle** /*handle*/) override { + return Status::NotSupported("Not implemented."); + } + Status VerifyChecksum() override { return Status::NotSupported("Not implemented."); } diff --git a/db/import_column_family_job.cc b/db/import_column_family_job.cc new file mode 100644 index 000000000..3c00a2591 --- /dev/null +++ b/db/import_column_family_job.cc @@ -0,0 +1,257 @@ +#ifndef ROCKSDB_LITE + +#include "db/import_column_family_job.h" + +#include +#include +#include +#include + +#include "db/version_edit.h" +#include "file/file_util.h" +#include "table/merging_iterator.h" +#include "table/scoped_arena_iterator.h" +#include "table/sst_file_writer_collectors.h" +#include "table/table_builder.h" +#include "util/file_reader_writer.h" +#include "util/stop_watch.h" + +namespace rocksdb { + +Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number, + SuperVersion* sv) { + Status status; + + // Read the information of files we are importing + for (const auto& file_metadata : metadata_) { + const auto file_path = file_metadata.db_path + "/" + file_metadata.name; + IngestedFileInfo file_to_import; + status = GetIngestedFileInfo(file_path, &file_to_import, sv); + if (!status.ok()) { + return status; + } + files_to_import_.push_back(file_to_import); + } + + const auto ucmp = cfd_->internal_comparator().user_comparator(); + auto num_files = files_to_import_.size(); + if (num_files == 0) { + return Status::InvalidArgument("The list of files is empty"); + } else if (num_files > 1) { + // Verify that passed files don't have overlapping ranges in any particular + // level. + int min_level = 1; // Check for overlaps in Level 1 and above. + int max_level = -1; + for (const auto& file_metadata : metadata_) { + if (file_metadata.level > max_level) { + max_level = file_metadata.level; + } + } + for (int level = min_level; level <= max_level; ++level) { + autovector sorted_files; + for (size_t i = 0; i < num_files; i++) { + if (metadata_[i].level == level) { + sorted_files.push_back(&files_to_import_[i]); + } + } + + std::sort(sorted_files.begin(), sorted_files.end(), + [&ucmp](const IngestedFileInfo* info1, + const IngestedFileInfo* info2) { + return ucmp->Compare(info1->smallest_user_key, + info2->smallest_user_key) < 0; + }); + + for (size_t i = 0; i < sorted_files.size() - 1; i++) { + if (ucmp->Compare(sorted_files[i]->largest_user_key, + sorted_files[i + 1]->smallest_user_key) >= 0) { + return Status::InvalidArgument("Files have overlapping ranges"); + } + } + } + } + + for (const auto& f : files_to_import_) { + if (f.num_entries == 0) { + return Status::InvalidArgument("File contain no entries"); + } + + if (!f.smallest_internal_key().Valid() || + !f.largest_internal_key().Valid()) { + return Status::Corruption("File has corrupted keys"); + } + } + + // Copy/Move external files into DB + auto hardlink_files = import_options_.move_files; + for (auto& f : files_to_import_) { + f.fd = FileDescriptor(next_file_number++, 0, f.file_size); + + const auto path_outside_db = f.external_file_path; + const auto path_inside_db = TableFileName( + cfd_->ioptions()->cf_paths, f.fd.GetNumber(), f.fd.GetPathId()); + + if (hardlink_files) { + status = env_->LinkFile(path_outside_db, path_inside_db); + if (status.IsNotSupported()) { + // Original file is on a different FS, use copy instead of hard linking + hardlink_files = false; + } + } + if (!hardlink_files) { + status = CopyFile(env_, path_outside_db, path_inside_db, 0, + db_options_.use_fsync); + } + if (!status.ok()) { + break; + } + f.copy_file = !hardlink_files; + f.internal_file_path = path_inside_db; + } + + if (!status.ok()) { + // We failed, remove all files that we copied into the db + for (const auto& f : files_to_import_) { + if (f.internal_file_path.empty()) { + break; + } + const auto s = env_->DeleteFile(f.internal_file_path); + if (!s.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "AddFile() clean up for file %s failed : %s", + f.internal_file_path.c_str(), s.ToString().c_str()); + } + } + } + + return status; +} + +// REQUIRES: we have become the only writer by entering both write_thread_ and +// nonmem_write_thread_ +Status ImportColumnFamilyJob::Run() { + Status status; + edit_.SetColumnFamily(cfd_->GetID()); + + for (size_t i = 0; i < files_to_import_.size(); ++i) { + const auto& f = files_to_import_[i]; + const auto& file_metadata = metadata_[i]; + edit_.AddFile(file_metadata.level, f.fd.GetNumber(), f.fd.GetPathId(), + f.fd.GetFileSize(), f.smallest_internal_key(), + f.largest_internal_key(), file_metadata.smallest_seqno, + file_metadata.largest_seqno, false); + + // If incoming sequence number is higher, update local sequence number. + if (file_metadata.largest_seqno > versions_->LastSequence()) { + versions_->SetLastAllocatedSequence(file_metadata.largest_seqno); + versions_->SetLastPublishedSequence(file_metadata.largest_seqno); + versions_->SetLastSequence(file_metadata.largest_seqno); + } + } + + return status; +} + +void ImportColumnFamilyJob::Cleanup(const Status& status) { + if (!status.ok()) { + // We failed to add files to the database remove all the files we copied. + for (const auto& f : files_to_import_) { + const auto s = env_->DeleteFile(f.internal_file_path); + if (!s.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "AddFile() clean up for file %s failed : %s", + f.internal_file_path.c_str(), s.ToString().c_str()); + } + } + } else if (status.ok() && import_options_.move_files) { + // The files were moved and added successfully, remove original file links + for (IngestedFileInfo& f : files_to_import_) { + const auto s = env_->DeleteFile(f.external_file_path); + if (!s.ok()) { + ROCKS_LOG_WARN( + db_options_.info_log, + "%s was added to DB successfully but failed to remove original " + "file link : %s", + f.external_file_path.c_str(), s.ToString().c_str()); + } + } + } +} + +Status ImportColumnFamilyJob::GetIngestedFileInfo( + const std::string& external_file, IngestedFileInfo* file_to_import, + SuperVersion* sv) { + file_to_import->external_file_path = external_file; + + // Get external file size + auto status = env_->GetFileSize(external_file, &file_to_import->file_size); + if (!status.ok()) { + return status; + } + + // Create TableReader for external file + std::unique_ptr table_reader; + std::unique_ptr sst_file; + std::unique_ptr sst_file_reader; + + status = env_->NewRandomAccessFile(external_file, &sst_file, env_options_); + if (!status.ok()) { + return status; + } + sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file), + external_file)); + + status = cfd_->ioptions()->table_factory->NewTableReader( + TableReaderOptions(*cfd_->ioptions(), + sv->mutable_cf_options.prefix_extractor.get(), + env_options_, cfd_->internal_comparator()), + std::move(sst_file_reader), file_to_import->file_size, &table_reader); + if (!status.ok()) { + return status; + } + + // Get the external file properties + auto props = table_reader->GetTableProperties(); + + // Set original_seqno to 0. + file_to_import->original_seqno = 0; + + // Get number of entries in table + file_to_import->num_entries = props->num_entries; + + ParsedInternalKey key; + ReadOptions ro; + // During reading the external file we can cache blocks that we read into + // the block cache, if we later change the global seqno of this file, we will + // have block in cache that will include keys with wrong seqno. + // We need to disable fill_cache so that we read from the file without + // updating the block cache. + ro.fill_cache = false; + std::unique_ptr iter(table_reader->NewIterator( + ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr, + /*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion)); + + // Get first (smallest) key from file + iter->SeekToFirst(); + if (!ParseInternalKey(iter->key(), &key)) { + return Status::Corruption("external file have corrupted keys"); + } + file_to_import->smallest_user_key = key.user_key.ToString(); + + // Get last (largest) key from file + iter->SeekToLast(); + if (!ParseInternalKey(iter->key(), &key)) { + return Status::Corruption("external file have corrupted keys"); + } + file_to_import->largest_user_key = key.user_key.ToString(); + + file_to_import->cf_id = static_cast(props->column_family_id); + + file_to_import->table_properties = *props; + + return status; +} + +} // namespace rocksdb + +#endif // !ROCKSDB_LITE diff --git a/db/import_column_family_job.h b/db/import_column_family_job.h new file mode 100644 index 000000000..5b8577df1 --- /dev/null +++ b/db/import_column_family_job.h @@ -0,0 +1,70 @@ +#pragma once +#include +#include +#include + +#include "db/column_family.h" +#include "db/dbformat.h" +#include "db/external_sst_file_ingestion_job.h" +#include "db/snapshot_impl.h" +#include "options/db_options.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/metadata.h" +#include "rocksdb/sst_file_writer.h" +#include "util/autovector.h" + +namespace rocksdb { + +// Imports a set of sst files as is into a new column family. Logic is similar +// to ExternalSstFileIngestionJob. +class ImportColumnFamilyJob { + public: + ImportColumnFamilyJob( + Env* env, VersionSet* versions, ColumnFamilyData* cfd, + const ImmutableDBOptions& db_options, const EnvOptions& env_options, + const ImportColumnFamilyOptions& import_options, + const std::vector& metadata) + : env_(env), + versions_(versions), + cfd_(cfd), + db_options_(db_options), + env_options_(env_options), + import_options_(import_options), + metadata_(metadata) {} + + // Prepare the job by copying external files into the DB. + Status Prepare(uint64_t next_file_number, SuperVersion* sv); + + // Will execute the import job and prepare edit() to be applied. + // REQUIRES: Mutex held + Status Run(); + + // Cleanup after successful/failed job + void Cleanup(const Status& status); + + VersionEdit* edit() { return &edit_; } + + const autovector& files_to_import() const { + return files_to_import_; + } + + private: + // Open the external file and populate `file_to_import` with all the + // external information we need to import this file. + Status GetIngestedFileInfo(const std::string& external_file, + IngestedFileInfo* file_to_import, + SuperVersion* sv); + + Env* env_; + VersionSet* versions_; + ColumnFamilyData* cfd_; + const ImmutableDBOptions& db_options_; + const EnvOptions& env_options_; + autovector files_to_import_; + VersionEdit edit_; + const ImportColumnFamilyOptions& import_options_; + std::vector metadata_; +}; + +} // namespace rocksdb diff --git a/db/import_column_family_test.cc b/db/import_column_family_test.cc new file mode 100644 index 000000000..a93ecbf11 --- /dev/null +++ b/db/import_column_family_test.cc @@ -0,0 +1,565 @@ +#ifndef ROCKSDB_LITE + +#include +#include "db/db_test_util.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "rocksdb/sst_file_writer.h" +#include "test_util/testutil.h" + +namespace rocksdb { + +class ImportColumnFamilyTest : public DBTestBase { + public: + ImportColumnFamilyTest() : DBTestBase("/import_column_family_test") { + sst_files_dir_ = dbname_ + "/sst_files/"; + DestroyAndRecreateExternalSSTFilesDir(); + export_files_dir_ = test::TmpDir(env_) + "/export"; + import_cfh_ = nullptr; + import_cfh2_ = nullptr; + metadata_ptr_ = nullptr; + } + + ~ImportColumnFamilyTest() { + if (import_cfh_) { + db_->DropColumnFamily(import_cfh_); + db_->DestroyColumnFamilyHandle(import_cfh_); + import_cfh_ = nullptr; + } + if (import_cfh2_) { + db_->DropColumnFamily(import_cfh2_); + db_->DestroyColumnFamilyHandle(import_cfh2_); + import_cfh2_ = nullptr; + } + if (metadata_ptr_) { + delete metadata_ptr_; + metadata_ptr_ = nullptr; + } + test::DestroyDir(env_, sst_files_dir_); + test::DestroyDir(env_, export_files_dir_); + } + + void DestroyAndRecreateExternalSSTFilesDir() { + test::DestroyDir(env_, sst_files_dir_); + env_->CreateDir(sst_files_dir_); + test::DestroyDir(env_, export_files_dir_); + } + + LiveFileMetaData LiveFileMetaDataInit(std::string name, + std::string path, + int level, + SequenceNumber smallest_seqno, + SequenceNumber largest_seqno) { + LiveFileMetaData metadata; + metadata.name = name; + metadata.db_path = path; + metadata.smallest_seqno = smallest_seqno; + metadata.largest_seqno = largest_seqno; + metadata.level = level; + return metadata; + } + + protected: + std::string sst_files_dir_; + std::string export_files_dir_; + ColumnFamilyHandle* import_cfh_; + ColumnFamilyHandle* import_cfh2_; + ExportImportFilesMetaData *metadata_ptr_; +}; + +TEST_F(ImportColumnFamilyTest, ImportSSTFileWriterFiles) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko"}, options); + + SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]); + SstFileWriter sfw_unknown(EnvOptions(), options); + + // cf1.sst + const std::string cf1_sst_name = "cf1.sst"; + const std::string cf1_sst = sst_files_dir_ + cf1_sst_name; + ASSERT_OK(sfw_cf1.Open(cf1_sst)); + ASSERT_OK(sfw_cf1.Put("K1", "V1")); + ASSERT_OK(sfw_cf1.Put("K2", "V2")); + ASSERT_OK(sfw_cf1.Finish()); + + // cf_unknown.sst + const std::string unknown_sst_name = "cf_unknown.sst"; + const std::string unknown_sst = sst_files_dir_ + unknown_sst_name; + ASSERT_OK(sfw_unknown.Open(unknown_sst)); + ASSERT_OK(sfw_unknown.Put("K3", "V1")); + ASSERT_OK(sfw_unknown.Put("K4", "V2")); + ASSERT_OK(sfw_unknown.Finish()); + + { + // Import sst file corresponding to cf1 onto a new cf and verify + ExportImportFilesMetaData metadata; + metadata.files.push_back( + LiveFileMetaDataInit(cf1_sst_name, sst_files_dir_, 0, 10, 19)); + metadata.db_comparator_name = options.comparator->Name(); + + ASSERT_OK(db_->CreateColumnFamilyWithImport( + options, "toto", ImportColumnFamilyOptions(), metadata, &import_cfh_)); + ASSERT_NE(import_cfh_, nullptr); + + std::string value; + db_->Get(ReadOptions(), import_cfh_, "K1", &value); + ASSERT_EQ(value, "V1"); + db_->Get(ReadOptions(), import_cfh_, "K2", &value); + ASSERT_EQ(value, "V2"); + ASSERT_OK(db_->DropColumnFamily(import_cfh_)); + ASSERT_OK(db_->DestroyColumnFamilyHandle(import_cfh_)); + import_cfh_ = nullptr; + } + + { + // Import sst file corresponding to unknown cf onto a new cf and verify + ExportImportFilesMetaData metadata; + metadata.files.push_back( + LiveFileMetaDataInit(unknown_sst_name, sst_files_dir_, 0, 20, 29)); + metadata.db_comparator_name = options.comparator->Name(); + + ASSERT_OK(db_->CreateColumnFamilyWithImport( + options, "yoyo", ImportColumnFamilyOptions(), metadata, &import_cfh_)); + ASSERT_NE(import_cfh_, nullptr); + + std::string value; + db_->Get(ReadOptions(), import_cfh_, "K3", &value); + ASSERT_EQ(value, "V1"); + db_->Get(ReadOptions(), import_cfh_, "K4", &value); + ASSERT_EQ(value, "V2"); + } +} + +TEST_F(ImportColumnFamilyTest, ImportSSTFileWriterFilesWithOverlap) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko"}, options); + + SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]); + + // file3.sst + const std::string file3_sst_name = "file3.sst"; + const std::string file3_sst = sst_files_dir_ + file3_sst_name; + ASSERT_OK(sfw_cf1.Open(file3_sst)); + for (int i = 0; i < 100; ++i) { + sfw_cf1.Put(Key(i), Key(i) + "_val"); + } + ASSERT_OK(sfw_cf1.Finish()); + + // file2.sst + const std::string file2_sst_name = "file2.sst"; + const std::string file2_sst = sst_files_dir_ + file2_sst_name; + ASSERT_OK(sfw_cf1.Open(file2_sst)); + for (int i = 0; i < 100; i += 2) { + sfw_cf1.Put(Key(i), Key(i) + "_overwrite1"); + } + ASSERT_OK(sfw_cf1.Finish()); + + // file1a.sst + const std::string file1a_sst_name = "file1a.sst"; + const std::string file1a_sst = sst_files_dir_ + file1a_sst_name; + ASSERT_OK(sfw_cf1.Open(file1a_sst)); + for (int i = 0; i < 52; i += 4) { + sfw_cf1.Put(Key(i), Key(i) + "_overwrite2"); + } + ASSERT_OK(sfw_cf1.Finish()); + + // file1b.sst + const std::string file1b_sst_name = "file1b.sst"; + const std::string file1b_sst = sst_files_dir_ + file1b_sst_name; + ASSERT_OK(sfw_cf1.Open(file1b_sst)); + for (int i = 52; i < 100; i += 4) { + sfw_cf1.Put(Key(i), Key(i) + "_overwrite2"); + } + ASSERT_OK(sfw_cf1.Finish()); + + // file0a.sst + const std::string file0a_sst_name = "file0a.sst"; + const std::string file0a_sst = sst_files_dir_ + file0a_sst_name; + ASSERT_OK(sfw_cf1.Open(file0a_sst)); + for (int i = 0; i < 100; i += 16) { + sfw_cf1.Put(Key(i), Key(i) + "_overwrite3"); + } + ASSERT_OK(sfw_cf1.Finish()); + + // file0b.sst + const std::string file0b_sst_name = "file0b.sst"; + const std::string file0b_sst = sst_files_dir_ + file0b_sst_name; + ASSERT_OK(sfw_cf1.Open(file0b_sst)); + for (int i = 0; i < 100; i += 16) { + sfw_cf1.Put(Key(i), Key(i) + "_overwrite4"); + } + ASSERT_OK(sfw_cf1.Finish()); + + // Import sst files and verify + ExportImportFilesMetaData metadata; + metadata.files.push_back( + LiveFileMetaDataInit(file3_sst_name, sst_files_dir_, 3, 10, 19)); + metadata.files.push_back( + LiveFileMetaDataInit(file2_sst_name, sst_files_dir_, 2, 20, 29)); + metadata.files.push_back( + LiveFileMetaDataInit(file1a_sst_name, sst_files_dir_, 1, 30, 34)); + metadata.files.push_back( + LiveFileMetaDataInit(file1b_sst_name, sst_files_dir_, 1, 35, 39)); + metadata.files.push_back( + LiveFileMetaDataInit(file0a_sst_name, sst_files_dir_, 0, 40, 49)); + metadata.files.push_back( + LiveFileMetaDataInit(file0b_sst_name, sst_files_dir_, 0, 50, 59)); + metadata.db_comparator_name = options.comparator->Name(); + + ASSERT_OK(db_->CreateColumnFamilyWithImport( + options, "toto", ImportColumnFamilyOptions(), metadata, &import_cfh_)); + ASSERT_NE(import_cfh_, nullptr); + + for (int i = 0; i < 100; i++) { + std::string value; + db_->Get(ReadOptions(), import_cfh_, Key(i), &value); + if (i % 16 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite4"); + } else if (i % 4 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite2"); + } else if (i % 2 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite1"); + } else { + ASSERT_EQ(value, Key(i) + "_val"); + } + } + + for (int i = 0; i < 100; i += 5) { + ASSERT_OK( + db_->Put(WriteOptions(), import_cfh_, Key(i), Key(i) + "_overwrite5")); + } + + // Flush and check again + ASSERT_OK(db_->Flush(FlushOptions(), import_cfh_)); + for (int i = 0; i < 100; i++) { + std::string value; + db_->Get(ReadOptions(), import_cfh_, Key(i), &value); + if (i % 5 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite5"); + } else if (i % 16 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite4"); + } else if (i % 4 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite2"); + } else if (i % 2 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite1"); + } else { + ASSERT_EQ(value, Key(i) + "_val"); + } + } + + // Compact and check again. + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), import_cfh_, nullptr, nullptr)); + for (int i = 0; i < 100; i++) { + std::string value; + db_->Get(ReadOptions(), import_cfh_, Key(i), &value); + if (i % 5 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite5"); + } else if (i % 16 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite4"); + } else if (i % 4 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite2"); + } else if (i % 2 == 0) { + ASSERT_EQ(value, Key(i) + "_overwrite1"); + } else { + ASSERT_EQ(value, Key(i) + "_val"); + } + } +} + +TEST_F(ImportColumnFamilyTest, ImportExportedSSTFromAnotherCF) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko"}, options); + + for (int i = 0; i < 100; ++i) { + Put(1, Key(i), Key(i) + "_val"); + } + ASSERT_OK(Flush(1)); + + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr)); + + // Overwrite the value in the same set of keys. + for (int i = 0; i < 100; ++i) { + Put(1, Key(i), Key(i) + "_overwrite"); + } + + // Flush to create L0 file. + ASSERT_OK(Flush(1)); + for (int i = 0; i < 100; ++i) { + Put(1, Key(i), Key(i) + "_overwrite2"); + } + + // Flush again to create another L0 file. It should have higher sequencer. + ASSERT_OK(Flush(1)); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->ExportColumnFamily(handles_[1], export_files_dir_, + &metadata_ptr_)); + ASSERT_NE(metadata_ptr_, nullptr); + + ImportColumnFamilyOptions import_options; + import_options.move_files = false; + ASSERT_OK(db_->CreateColumnFamilyWithImport(options, "toto", import_options, + *metadata_ptr_, &import_cfh_)); + ASSERT_NE(import_cfh_, nullptr); + + import_options.move_files = true; + ASSERT_OK(db_->CreateColumnFamilyWithImport(options, "yoyo", import_options, + *metadata_ptr_, &import_cfh2_)); + ASSERT_NE(import_cfh2_, nullptr); + delete metadata_ptr_; + metadata_ptr_ = NULL; + + std::string value1, value2; + + for (int i = 0; i < 100; ++i) { + db_->Get(ReadOptions(), import_cfh_, Key(i), &value1); + ASSERT_EQ(Get(1, Key(i)), value1); + } + + for (int i = 0; i < 100; ++i) { + db_->Get(ReadOptions(), import_cfh2_, Key(i), &value2); + ASSERT_EQ(Get(1, Key(i)), value2); + } + + // Modify keys in cf1 and verify. + for (int i = 0; i < 25; i++) { + ASSERT_OK(db_->Delete(WriteOptions(), import_cfh_, Key(i))); + } + for (int i = 25; i < 50; i++) { + ASSERT_OK( + db_->Put(WriteOptions(), import_cfh_, Key(i), Key(i) + "_overwrite3")); + } + for (int i = 0; i < 25; ++i) { + ASSERT_TRUE( + db_->Get(ReadOptions(), import_cfh_, Key(i), &value1).IsNotFound()); + } + for (int i = 25; i < 50; ++i) { + db_->Get(ReadOptions(), import_cfh_, Key(i), &value1); + ASSERT_EQ(Key(i) + "_overwrite3", value1); + } + for (int i = 50; i < 100; ++i) { + db_->Get(ReadOptions(), import_cfh_, Key(i), &value1); + ASSERT_EQ(Key(i) + "_overwrite2", value1); + } + + for (int i = 0; i < 100; ++i) { + db_->Get(ReadOptions(), import_cfh2_, Key(i), &value2); + ASSERT_EQ(Get(1, Key(i)), value2); + } + + // Compact and check again. + ASSERT_OK(db_->Flush(FlushOptions(), import_cfh_)); + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), import_cfh_, nullptr, nullptr)); + + for (int i = 0; i < 25; ++i) { + ASSERT_TRUE( + db_->Get(ReadOptions(), import_cfh_, Key(i), &value1).IsNotFound()); + } + for (int i = 25; i < 50; ++i) { + db_->Get(ReadOptions(), import_cfh_, Key(i), &value1); + ASSERT_EQ(Key(i) + "_overwrite3", value1); + } + for (int i = 50; i < 100; ++i) { + db_->Get(ReadOptions(), import_cfh_, Key(i), &value1); + ASSERT_EQ(Key(i) + "_overwrite2", value1); + } + + for (int i = 0; i < 100; ++i) { + db_->Get(ReadOptions(), import_cfh2_, Key(i), &value2); + ASSERT_EQ(Get(1, Key(i)), value2); + } +} + +TEST_F(ImportColumnFamilyTest, ImportExportedSSTFromAnotherDB) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko"}, options); + + for (int i = 0; i < 100; ++i) { + Put(1, Key(i), Key(i) + "_val"); + } + ASSERT_OK(Flush(1)); + + // Compact to create a L1 file. + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr)); + + // Overwrite the value in the same set of keys. + for (int i = 0; i < 50; ++i) { + Put(1, Key(i), Key(i) + "_overwrite"); + } + + // Flush to create L0 file. + ASSERT_OK(Flush(1)); + + for (int i = 0; i < 25; ++i) { + Put(1, Key(i), Key(i) + "_overwrite2"); + } + + // Flush again to create another L0 file. It should have higher sequencer. + ASSERT_OK(Flush(1)); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->ExportColumnFamily(handles_[1], export_files_dir_, + &metadata_ptr_)); + ASSERT_NE(metadata_ptr_, nullptr); + + // Create a new db and import the files. + DB* db_copy; + test::DestroyDir(env_, dbname_ + "/db_copy"); + ASSERT_OK(DB::Open(options, dbname_ + "/db_copy", &db_copy)); + ColumnFamilyHandle* cfh = nullptr; + ASSERT_OK(db_copy->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo", + ImportColumnFamilyOptions(), + *metadata_ptr_, &cfh)); + ASSERT_NE(cfh, nullptr); + + for (int i = 0; i < 100; ++i) { + std::string value; + db_copy->Get(ReadOptions(), cfh, Key(i), &value); + ASSERT_EQ(Get(1, Key(i)), value); + } + db_copy->DropColumnFamily(cfh); + test::DestroyDir(env_, dbname_ + "/db_copy"); +} + +TEST_F(ImportColumnFamilyTest, ImportColumnFamilyNegativeTest) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko"}, options); + + { + // Create column family with existing cf name. + ExportImportFilesMetaData metadata; + + ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "koko", + ImportColumnFamilyOptions(), + metadata, &import_cfh_), + Status::InvalidArgument("Column family already exists")); + ASSERT_EQ(import_cfh_, nullptr); + } + + { + // Import with no files specified. + ExportImportFilesMetaData metadata; + + ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo", + ImportColumnFamilyOptions(), + metadata, &import_cfh_), + Status::InvalidArgument("The list of files is empty")); + ASSERT_EQ(import_cfh_, nullptr); + } + + { + // Import with overlapping keys in sst files. + ExportImportFilesMetaData metadata; + SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]); + const std::string file1_sst_name = "file1.sst"; + const std::string file1_sst = sst_files_dir_ + file1_sst_name; + ASSERT_OK(sfw_cf1.Open(file1_sst)); + ASSERT_OK(sfw_cf1.Put("K1", "V1")); + ASSERT_OK(sfw_cf1.Put("K2", "V2")); + ASSERT_OK(sfw_cf1.Finish()); + const std::string file2_sst_name = "file2.sst"; + const std::string file2_sst = sst_files_dir_ + file2_sst_name; + ASSERT_OK(sfw_cf1.Open(file2_sst)); + ASSERT_OK(sfw_cf1.Put("K2", "V2")); + ASSERT_OK(sfw_cf1.Put("K3", "V3")); + ASSERT_OK(sfw_cf1.Finish()); + + metadata.files.push_back( + LiveFileMetaDataInit(file1_sst_name, sst_files_dir_, 1, 10, 19)); + metadata.files.push_back( + LiveFileMetaDataInit(file2_sst_name, sst_files_dir_, 1, 10, 19)); + metadata.db_comparator_name = options.comparator->Name(); + + ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo", + ImportColumnFamilyOptions(), + metadata, &import_cfh_), + Status::InvalidArgument("Files have overlapping ranges")); + ASSERT_EQ(import_cfh_, nullptr); + } + + { + // Import with a mismatching comparator, should fail with appropriate error. + ExportImportFilesMetaData metadata; + Options mismatch_options = CurrentOptions(); + mismatch_options.comparator = ReverseBytewiseComparator(); + SstFileWriter sfw_cf1(EnvOptions(), mismatch_options, handles_[1]); + const std::string file1_sst_name = "file1.sst"; + const std::string file1_sst = sst_files_dir_ + file1_sst_name; + ASSERT_OK(sfw_cf1.Open(file1_sst)); + ASSERT_OK(sfw_cf1.Put("K2", "V2")); + ASSERT_OK(sfw_cf1.Put("K1", "V1")); + ASSERT_OK(sfw_cf1.Finish()); + + metadata.files.push_back( + LiveFileMetaDataInit(file1_sst_name, sst_files_dir_, 1, 10, 19)); + metadata.db_comparator_name = mismatch_options.comparator->Name(); + + ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "coco", + ImportColumnFamilyOptions(), + metadata, &import_cfh_), + Status::InvalidArgument("Comparator name mismatch")); + ASSERT_EQ(import_cfh_, nullptr); + } + + { + // Import with non existent sst file should fail with appropriate error + ExportImportFilesMetaData metadata; + SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]); + const std::string file1_sst_name = "file1.sst"; + const std::string file1_sst = sst_files_dir_ + file1_sst_name; + ASSERT_OK(sfw_cf1.Open(file1_sst)); + ASSERT_OK(sfw_cf1.Put("K1", "V1")); + ASSERT_OK(sfw_cf1.Put("K2", "V2")); + ASSERT_OK(sfw_cf1.Finish()); + const std::string file3_sst_name = "file3.sst"; + + metadata.files.push_back( + LiveFileMetaDataInit(file1_sst_name, sst_files_dir_, 1, 10, 19)); + metadata.files.push_back( + LiveFileMetaDataInit(file3_sst_name, sst_files_dir_, 1, 10, 19)); + metadata.db_comparator_name = options.comparator->Name(); + + ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo", + ImportColumnFamilyOptions(), + metadata, &import_cfh_), + Status::IOError("No such file or directory")); + ASSERT_EQ(import_cfh_, nullptr); + + // Test successful import after a failure with the same CF name. Ensures + // there is no side effect with CF when there is a failed import + metadata.files.pop_back(); + metadata.db_comparator_name = options.comparator->Name(); + + ASSERT_OK(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo", + ImportColumnFamilyOptions(), + metadata, &import_cfh_)); + ASSERT_NE(import_cfh_, nullptr); + } + +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include + +int main(int argc, char** argv) { + fprintf(stderr, + "SKIPPED as External SST File Writer and Import are not supported " + "in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !ROCKSDB_LITE diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 0f8573e43..d90ca900f 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1174,6 +1174,27 @@ class DB { virtual Status IngestExternalFiles( const std::vector& args) = 0; + // CreateColumnFamilyWithImport() will create a new column family with + // column_family_name and import external SST files specified in metadata into + // this column family. + // (1) External SST files can be created using SstFileWriter. + // (2) External SST files can be exported from a particular column family in + // an existing DB. + // Option in import_options specifies whether the external files are copied or + // moved (default is copy). When option specifies copy, managing files at + // external_file_path is caller's responsibility. When option specifies a + // move, the call ensures that the specified files at external_file_path are + // deleted on successful return and files are not modified on any error + // return. + // On error return, column family handle returned will be nullptr. + // ColumnFamily will be present on successful return and will not be present + // on error return. ColumnFamily may be present on any crash during this call. + virtual Status CreateColumnFamilyWithImport( + const ColumnFamilyOptions& options, const std::string& column_family_name, + const ImportColumnFamilyOptions& import_options, + const ExportImportFilesMetaData& metadata, + ColumnFamilyHandle** handle) = 0; + virtual Status VerifyChecksum() = 0; // AddFile() is deprecated, please use IngestExternalFile() diff --git a/include/rocksdb/metadata.h b/include/rocksdb/metadata.h index a0ab41efd..7b251eb72 100644 --- a/include/rocksdb/metadata.h +++ b/include/rocksdb/metadata.h @@ -108,4 +108,11 @@ struct LiveFileMetaData : SstFileMetaData { int level; // Level at which this file resides. LiveFileMetaData() : column_family_name(), level(0) {} }; + +// Metadata returned as output from ExportColumnFamily() and used as input to +// CreateColumnFamiliesWithImport(). +struct ExportImportFilesMetaData { + std::string db_comparator_name; // Used to safety check at import. + std::vector files; // Vector of file metadata. +}; } // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 8ebcd292d..09dc8e54c 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1491,4 +1491,10 @@ struct TraceOptions { uint64_t filter = kTraceFilterNone; }; +// ImportColumnFamilyOptions is used by ImportColumnFamily() +struct ImportColumnFamilyOptions { + // Can be set to true to move the files instead of copying them. + bool move_files = false; +}; + } // namespace rocksdb diff --git a/include/rocksdb/utilities/checkpoint.h b/include/rocksdb/utilities/checkpoint.h index aa0a394d4..5f12922c4 100644 --- a/include/rocksdb/utilities/checkpoint.h +++ b/include/rocksdb/utilities/checkpoint.h @@ -9,11 +9,15 @@ #ifndef ROCKSDB_LITE #include +#include #include "rocksdb/status.h" namespace rocksdb { class DB; +class ColumnFamilyHandle; +struct LiveFileMetaData; +struct ExportImportFilesMetaData; class Checkpoint { public: @@ -36,6 +40,16 @@ class Checkpoint { virtual Status CreateCheckpoint(const std::string& checkpoint_dir, uint64_t log_size_for_flush = 0); + // Exports all live SST files of a specified Column Family onto export_dir, + // returning SST files information in metadata. + // - SST files will be created as hard links when the directory specified + // is in the same partition as the db directory, copied otherwise. + // - export_dir should not already exist and will be created by this API. + // - Always triggers a flush. + virtual Status ExportColumnFamily(ColumnFamilyHandle* handle, + const std::string& export_dir, + ExportImportFilesMetaData** metadata); + virtual ~Checkpoint() {} }; diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 8535952cd..a52aff5d8 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -120,6 +120,16 @@ class StackableDB : public DB { return db_->IngestExternalFiles(args); } + using DB::CreateColumnFamilyWithImport; + virtual Status CreateColumnFamilyWithImport( + const ColumnFamilyOptions& options, const std::string& column_family_name, + const ImportColumnFamilyOptions& import_options, + const ExportImportFilesMetaData& metadata, + ColumnFamilyHandle** handle) override { + return db_->CreateColumnFamilyWithImport(options, column_family_name, + import_options, metadata, handle); + } + virtual Status VerifyChecksum() override { return db_->VerifyChecksum(); } using DB::KeyMayExist; diff --git a/src.mk b/src.mk index fe930d5f4..4d635173b 100644 --- a/src.mk +++ b/src.mk @@ -36,6 +36,7 @@ LIB_SOURCES = \ db/flush_job.cc \ db/flush_scheduler.cc \ db/forward_iterator.cc \ + db/import_column_family_job.cc \ db/internal_stats.cc \ db/logs_with_prep_tracker.cc \ db/log_reader.cc \ diff --git a/utilities/checkpoint/checkpoint_impl.cc b/utilities/checkpoint/checkpoint_impl.cc index 4835f26da..0639ed2f2 100644 --- a/utilities/checkpoint/checkpoint_impl.cc +++ b/utilities/checkpoint/checkpoint_impl.cc @@ -22,6 +22,7 @@ #include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" +#include "rocksdb/metadata.h" #include "rocksdb/transaction_log.h" #include "rocksdb/utilities/checkpoint.h" #include "test_util/sync_point.h" @@ -60,6 +61,12 @@ void CheckpointImpl::CleanStagingDirectory( full_private_path.c_str(), s.ToString().c_str()); } +Status Checkpoint::ExportColumnFamily( + ColumnFamilyHandle* /*handle*/, const std::string& /*export_dir*/, + ExportImportFilesMetaData** /*metadata*/) { + return Status::NotSupported(""); +} + // Builds an openable snapshot of RocksDB Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, uint64_t log_size_for_flush) { @@ -322,6 +329,184 @@ Status CheckpointImpl::CreateCustomCheckpoint( return s; } +// Exports all live SST files of a specified Column Family onto export_dir, +// returning SST files information in metadata. +Status CheckpointImpl::ExportColumnFamily( + ColumnFamilyHandle* handle, const std::string& export_dir, + ExportImportFilesMetaData** metadata) { + auto cfh = reinterpret_cast(handle); + const auto cf_name = cfh->GetName(); + const auto db_options = db_->GetDBOptions(); + + assert(metadata != nullptr); + assert(*metadata == nullptr); + auto s = db_->GetEnv()->FileExists(export_dir); + if (s.ok()) { + return Status::InvalidArgument("Specified export_dir exists"); + } else if (!s.IsNotFound()) { + assert(s.IsIOError()); + return s; + } + + const auto final_nonslash_idx = export_dir.find_last_not_of('/'); + if (final_nonslash_idx == std::string::npos) { + return Status::InvalidArgument("Specified export_dir invalid"); + } + ROCKS_LOG_INFO(db_options.info_log, + "[%s] export column family onto export directory %s", + cf_name.c_str(), export_dir.c_str()); + + // Create a temporary export directory. + const auto tmp_export_dir = + export_dir.substr(0, final_nonslash_idx + 1) + ".tmp"; + s = db_->GetEnv()->CreateDir(tmp_export_dir); + + if (s.ok()) { + s = db_->Flush(rocksdb::FlushOptions(), handle); + } + + ColumnFamilyMetaData db_metadata; + if (s.ok()) { + // Export live sst files with file deletions disabled. + s = db_->DisableFileDeletions(); + if (s.ok()) { + db_->GetColumnFamilyMetaData(handle, &db_metadata); + + s = ExportFilesInMetaData( + db_options, db_metadata, + [&](const std::string& src_dirname, const std::string& fname) { + ROCKS_LOG_INFO(db_options.info_log, "[%s] HardLinking %s", + cf_name.c_str(), fname.c_str()); + return db_->GetEnv()->LinkFile(src_dirname + fname, + tmp_export_dir + fname); + } /*link_file_cb*/, + [&](const std::string& src_dirname, const std::string& fname) { + ROCKS_LOG_INFO(db_options.info_log, "[%s] Copying %s", + cf_name.c_str(), fname.c_str()); + return CopyFile(db_->GetEnv(), src_dirname + fname, + tmp_export_dir + fname, 0, db_options.use_fsync); + } /*copy_file_cb*/); + + const auto enable_status = db_->EnableFileDeletions(false /*force*/); + if (s.ok()) { + s = enable_status; + } + } + } + + auto moved_to_user_specified_dir = false; + if (s.ok()) { + // Move temporary export directory to the actual export directory. + s = db_->GetEnv()->RenameFile(tmp_export_dir, export_dir); + } + + if (s.ok()) { + // Fsync export directory. + moved_to_user_specified_dir = true; + std::unique_ptr dir_ptr; + s = db_->GetEnv()->NewDirectory(export_dir, &dir_ptr); + if (s.ok()) { + assert(dir_ptr != nullptr); + s = dir_ptr->Fsync(); + } + } + + if (s.ok()) { + // Export of files succeeded. Fill in the metadata information. + auto result_metadata = new ExportImportFilesMetaData(); + result_metadata->db_comparator_name = handle->GetComparator()->Name(); + for (const auto& level_metadata : db_metadata.levels) { + for (const auto& file_metadata : level_metadata.files) { + LiveFileMetaData live_file_metadata; + live_file_metadata.size = file_metadata.size; + live_file_metadata.name = std::move(file_metadata.name); + live_file_metadata.db_path = export_dir; + live_file_metadata.smallest_seqno = file_metadata.smallest_seqno; + live_file_metadata.largest_seqno = file_metadata.largest_seqno; + live_file_metadata.smallestkey = std::move(file_metadata.smallestkey); + live_file_metadata.largestkey = std::move(file_metadata.largestkey); + live_file_metadata.level = level_metadata.level; + result_metadata->files.push_back(live_file_metadata); + } + *metadata = result_metadata; + } + ROCKS_LOG_INFO(db_options.info_log, "[%s] Export succeeded.", + cf_name.c_str()); + } else { + // Failure: Clean up all the files/directories created. + ROCKS_LOG_INFO(db_options.info_log, "[%s] Export failed. %s", + cf_name.c_str(), s.ToString().c_str()); + std::vector subchildren; + const auto cleanup_dir = + moved_to_user_specified_dir ? export_dir : tmp_export_dir; + db_->GetEnv()->GetChildren(cleanup_dir, &subchildren); + for (const auto& subchild : subchildren) { + const auto subchild_path = cleanup_dir + "/" + subchild; + const auto status = db_->GetEnv()->DeleteFile(subchild_path); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup file %s: %s", + subchild_path.c_str(), status.ToString().c_str()); + } + } + const auto status = db_->GetEnv()->DeleteDir(cleanup_dir); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup dir %s: %s", + cleanup_dir.c_str(), status.ToString().c_str()); + } + } + return s; +} + +Status CheckpointImpl::ExportFilesInMetaData( + const DBOptions& db_options, const ColumnFamilyMetaData& metadata, + std::function + link_file_cb, + std::function + copy_file_cb) { + Status s; + auto hardlink_file = true; + + // Copy/hard link files in metadata. + size_t num_files = 0; + for (const auto& level_metadata : metadata.levels) { + for (const auto& file_metadata : level_metadata.files) { + uint64_t number; + FileType type; + const auto ok = ParseFileName(file_metadata.name, &number, &type); + if (!ok) { + s = Status::Corruption("Could not parse file name"); + break; + } + + // We should only get sst files here. + assert(type == kTableFile); + assert(file_metadata.size > 0 && file_metadata.name[0] == '/'); + const auto src_fname = file_metadata.name; + ++num_files; + + if (hardlink_file) { + s = link_file_cb(db_->GetName(), src_fname); + if (num_files == 1 && s.IsNotSupported()) { + // Fallback to copy if link failed due to cross-device directories. + hardlink_file = false; + s = Status::OK(); + } + } + if (!hardlink_file) { + s = copy_file_cb(db_->GetName(), src_fname); + } + if (!s.ok()) { + break; + } + } + } + ROCKS_LOG_INFO(db_options.info_log, "Number of table files %" ROCKSDB_PRIszt, + num_files); + + return s; +} } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/checkpoint/checkpoint_impl.h b/utilities/checkpoint/checkpoint_impl.h index d26a9f66b..0d87b635b 100644 --- a/utilities/checkpoint/checkpoint_impl.h +++ b/utilities/checkpoint/checkpoint_impl.h @@ -30,6 +30,17 @@ class CheckpointImpl : public Checkpoint { virtual Status CreateCheckpoint(const std::string& checkpoint_dir, uint64_t log_size_for_flush) override; + // Exports all live SST files of a specified Column Family onto export_dir + // and returning SST files information in metadata. + // - SST files will be created as hard links when the directory specified + // is in the same partition as the db directory, copied otherwise. + // - export_dir should not already exist and will be created by this API. + // - Always triggers a flush. + using Checkpoint::ExportColumnFamily; + virtual Status ExportColumnFamily( + ColumnFamilyHandle* handle, const std::string& export_dir, + ExportImportFilesMetaData** metadata) override; + // Checkpoint logic can be customized by providing callbacks for link, copy, // or create. Status CreateCustomCheckpoint( @@ -48,6 +59,18 @@ class CheckpointImpl : public Checkpoint { private: void CleanStagingDirectory(const std::string& path, Logger* info_log); + + // Export logic customization by providing callbacks for link or copy. + Status ExportFilesInMetaData( + const DBOptions& db_options, const ColumnFamilyMetaData& metadata, + std::function + link_file_cb, + std::function + copy_file_cb); + + private: DB* db_; }; diff --git a/utilities/checkpoint/checkpoint_test.cc b/utilities/checkpoint/checkpoint_test.cc index d7d2548af..d748f500e 100644 --- a/utilities/checkpoint/checkpoint_test.cc +++ b/utilities/checkpoint/checkpoint_test.cc @@ -26,6 +26,7 @@ #include "test_util/fault_injection_test_env.h" #include "test_util/sync_point.h" #include "test_util/testharness.h" +#include "test_util/testutil.h" namespace rocksdb { class CheckpointTest : public testing::Test { @@ -44,6 +45,9 @@ class CheckpointTest : public testing::Test { Options last_options_; std::vector handles_; std::string snapshot_name_; + std::string export_path_; + ColumnFamilyHandle* cfh_reverse_comp_; + ExportImportFilesMetaData* metadata_; CheckpointTest() : env_(Env::Default()) { env_->SetBackgroundThreads(1, Env::LOW); @@ -64,12 +68,24 @@ class CheckpointTest : public testing::Test { EXPECT_OK(DestroyDB(snapshot_tmp_name, options)); env_->DeleteDir(snapshot_tmp_name); Reopen(options); + export_path_ = test::TmpDir(env_) + "/export"; + test::DestroyDir(env_, export_path_); + cfh_reverse_comp_ = nullptr; + metadata_ = nullptr; } ~CheckpointTest() override { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->LoadDependency({}); rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + if (cfh_reverse_comp_) { + EXPECT_OK(db_->DestroyColumnFamilyHandle(cfh_reverse_comp_)); + cfh_reverse_comp_ = nullptr; + } + if (metadata_) { + delete metadata_; + metadata_ = nullptr; + } Close(); Options options; options.db_paths.emplace_back(dbname_, 0); @@ -78,6 +94,7 @@ class CheckpointTest : public testing::Test { options.db_paths.emplace_back(dbname_ + "_4", 0); EXPECT_OK(DestroyDB(dbname_, options)); EXPECT_OK(DestroyDB(snapshot_name_, options)); + test::DestroyDir(env_, export_path_); } // Return the current option configuration. @@ -140,6 +157,12 @@ class CheckpointTest : public testing::Test { ASSERT_OK(TryReopen(options)); } + void CompactAll() { + for (auto h : handles_) { + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), h, nullptr, nullptr)); + } + } + void Close() { for (auto h : handles_) { delete h; @@ -289,6 +312,109 @@ TEST_F(CheckpointTest, GetSnapshotLink) { } } +TEST_F(CheckpointTest, ExportColumnFamilyWithLinks) { + // Create a database + Status s; + auto options = CurrentOptions(); + options.create_if_missing = true; + CreateAndReopenWithCF({}, options); + + // Helper to verify the number of files in metadata and export dir + auto verify_files_exported = [&](const ExportImportFilesMetaData& metadata, + int num_files_expected) { + ASSERT_EQ(metadata.files.size(), num_files_expected); + std::vector subchildren; + env_->GetChildren(export_path_, &subchildren); + int num_children = 0; + for (const auto& child : subchildren) { + if (child != "." && child != "..") { + ++num_children; + } + } + ASSERT_EQ(num_children, num_files_expected); + }; + + // Test DefaultColumnFamily + { + const auto key = std::string("foo"); + ASSERT_OK(Put(key, "v1")); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + // Export the Tables and verify + ASSERT_OK(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_)); + verify_files_exported(*metadata_, 1); + ASSERT_EQ(metadata_->db_comparator_name, options.comparator->Name()); + test::DestroyDir(env_, export_path_); + delete metadata_; + metadata_ = nullptr; + + // Check again after compaction + CompactAll(); + ASSERT_OK(Put(key, "v2")); + ASSERT_OK(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_)); + verify_files_exported(*metadata_, 2); + ASSERT_EQ(metadata_->db_comparator_name, options.comparator->Name()); + test::DestroyDir(env_, export_path_); + delete metadata_; + metadata_ = nullptr; + delete checkpoint; + } + + // Test non default column family with non default comparator + { + auto cf_options = CurrentOptions(); + cf_options.comparator = ReverseBytewiseComparator(); + ASSERT_OK( + db_->CreateColumnFamily(cf_options, "yoyo", &cfh_reverse_comp_)); + + const auto key = std::string("foo"); + ASSERT_OK(db_->Put(WriteOptions(), cfh_reverse_comp_, key, "v1")); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + // Export the Tables and verify + ASSERT_OK(checkpoint->ExportColumnFamily(cfh_reverse_comp_, export_path_, + &metadata_)); + verify_files_exported(*metadata_, 1); + ASSERT_EQ(metadata_->db_comparator_name, + ReverseBytewiseComparator()->Name()); + delete checkpoint; + } +} + +TEST_F(CheckpointTest, ExportColumnFamilyNegativeTest) { + // Create a database + Status s; + auto options = CurrentOptions(); + options.create_if_missing = true; + CreateAndReopenWithCF({}, options); + + const auto key = std::string("foo"); + ASSERT_OK(Put(key, "v1")); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + // Export onto existing directory + env_->CreateDirIfMissing(export_path_); + ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_), + Status::InvalidArgument("Specified export_dir exists")); + test::DestroyDir(env_, export_path_); + + // Export with invalid directory specification + export_path_ = ""; + ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_), + Status::InvalidArgument("Specified export_dir invalid")); + delete checkpoint; +} + TEST_F(CheckpointTest, CheckpointCF) { Options options = CurrentOptions(); CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options);