Export Import sst files (#5495)

Summary:
Refresh of the earlier change here - https://github.com/facebook/rocksdb/issues/5135

This is a review request for code change needed for - https://github.com/facebook/rocksdb/issues/3469
"Add support for taking snapshot of a column family and creating column family from a given CF snapshot"

We have an implementation for this that we have been testing internally. We have two new APIs that together provide this functionality.

(1) ExportColumnFamily() - This API is modelled after CreateCheckpoint() as below.
// Exports all live SST files of a specified Column Family onto export_dir,
// returning SST files information in metadata.
// - SST files will be created as hard links when the directory specified
//   is in the same partition as the db directory, copied otherwise.
// - export_dir should not already exist and will be created by this API.
// - Always triggers a flush.
virtual Status ExportColumnFamily(ColumnFamilyHandle* handle,
                                  const std::string& export_dir,
                                  ExportImportFilesMetaData** metadata);

Internally, the API will DisableFileDeletions(), GetColumnFamilyMetaData(), Parse through
metadata, creating links/copies of all the sst files, EnableFileDeletions() and complete the call by
returning the list of file metadata.

(2) CreateColumnFamilyWithImport() - This API is modeled after IngestExternalFile(), but invoked only during a CF creation as below.
// CreateColumnFamilyWithImport() will create a new column family with
// column_family_name and import external SST files specified in metadata into
// this column family.
// (1) External SST files can be created using SstFileWriter.
// (2) External SST files can be exported from a particular column family in
//     an existing DB.
// Option in import_options specifies whether the external files are copied or
// moved (default is copy). When option specifies copy, managing files at
// external_file_path is caller's responsibility. When option specifies a
// move, the call ensures that the specified files at external_file_path are
// deleted on successful return and files are not modified on any error
// return.
// On error return, column family handle returned will be nullptr.
// ColumnFamily will be present on successful return and will not be present
// on error return. ColumnFamily may be present on any crash during this call.
virtual Status CreateColumnFamilyWithImport(
    const ColumnFamilyOptions& options, const std::string& column_family_name,
    const ImportColumnFamilyOptions& import_options,
    const ExportImportFilesMetaData& metadata,
    ColumnFamilyHandle** handle);

Internally, this API creates a new CF, parses all the sst files and adds it to the specified column family, at the same level and with same sequence number as in the metadata. Also performs safety checks with respect to overlaps between the sst files being imported.

If incoming sequence number is higher than current local sequence number, local sequence
number is updated to reflect this.

Note, as the sst files is are being moved across Column Families, Column Family name in sst file
will no longer match the actual column family on destination DB. The API does not modify Column
Family name or id in the sst files being imported.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5495

Differential Revision: D16018881

fbshipit-source-id: 9ae2251025d5916d35a9fc4ea4d6707f6be16ff9
main
Venki Pallipadi 6 years ago committed by Facebook Github Bot
parent a3c1832e86
commit 22ce462450
  1. 1
      CMakeLists.txt
  2. 5
      Makefile
  3. 1
      TARGETS
  4. 9
      db/compacted_db_impl.h
  5. 122
      db/db_impl/db_impl.cc
  6. 11
      db/db_impl/db_impl.h
  7. 10
      db/db_impl/db_impl_readonly.h
  8. 10
      db/db_test.cc
  9. 257
      db/import_column_family_job.cc
  10. 70
      db/import_column_family_job.h
  11. 565
      db/import_column_family_test.cc
  12. 21
      include/rocksdb/db.h
  13. 7
      include/rocksdb/metadata.h
  14. 6
      include/rocksdb/options.h
  15. 14
      include/rocksdb/utilities/checkpoint.h
  16. 10
      include/rocksdb/utilities/stackable_db.h
  17. 1
      src.mk
  18. 185
      utilities/checkpoint/checkpoint_impl.cc
  19. 23
      utilities/checkpoint/checkpoint_impl.h
  20. 126
      utilities/checkpoint/checkpoint_test.cc

@ -520,6 +520,7 @@ set(SOURCES
db/flush_job.cc db/flush_job.cc
db/flush_scheduler.cc db/flush_scheduler.cc
db/forward_iterator.cc db/forward_iterator.cc
db/import_column_family_job.cc
db/internal_stats.cc db/internal_stats.cc
db/logs_with_prep_tracker.cc db/logs_with_prep_tracker.cc
db/log_reader.cc db/log_reader.cc

@ -500,6 +500,7 @@ TESTS = \
plain_table_db_test \ plain_table_db_test \
comparator_db_test \ comparator_db_test \
external_sst_file_test \ external_sst_file_test \
import_column_family_test \
prefix_test \ prefix_test \
skiplist_test \ skiplist_test \
write_buffer_manager_test \ write_buffer_manager_test \
@ -577,6 +578,7 @@ PARALLEL_TEST = \
db_universal_compaction_test \ db_universal_compaction_test \
db_wal_test \ db_wal_test \
external_sst_file_test \ external_sst_file_test \
import_column_family_test \
fault_injection_test \ fault_injection_test \
inlineskiplist_test \ inlineskiplist_test \
manual_compaction_test \ manual_compaction_test \
@ -1274,6 +1276,9 @@ external_sst_file_basic_test: db/external_sst_file_basic_test.o db/db_test_util.
external_sst_file_test: db/external_sst_file_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) external_sst_file_test: db/external_sst_file_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
import_column_family_test: db/import_column_family_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_tailing_iter_test: db/db_tailing_iter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) db_tailing_iter_test: db/db_tailing_iter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)

@ -113,6 +113,7 @@ cpp_library(
"db/flush_job.cc", "db/flush_job.cc",
"db/flush_scheduler.cc", "db/flush_scheduler.cc",
"db/forward_iterator.cc", "db/forward_iterator.cc",
"db/import_column_family_job.cc",
"db/internal_stats.cc", "db/internal_stats.cc",
"db/log_reader.cc", "db/log_reader.cc",
"db/log_writer.cc", "db/log_writer.cc",

@ -85,6 +85,15 @@ class CompactedDBImpl : public DBImpl {
const IngestExternalFileOptions& /*ingestion_options*/) override { const IngestExternalFileOptions& /*ingestion_options*/) override {
return Status::NotSupported("Not supported in compacted db mode."); return Status::NotSupported("Not supported in compacted db mode.");
} }
using DB::CreateColumnFamilyWithImport;
virtual Status CreateColumnFamilyWithImport(
const ColumnFamilyOptions& /*options*/,
const std::string& /*column_family_name*/,
const ImportColumnFamilyOptions& /*import_options*/,
const ExportImportFilesMetaData& /*metadata*/,
ColumnFamilyHandle** /*handle*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}
private: private:
friend class DB; friend class DB;

@ -33,6 +33,7 @@
#include "db/error_handler.h" #include "db/error_handler.h"
#include "db/event_helpers.h" #include "db/event_helpers.h"
#include "db/external_sst_file_ingestion_job.h" #include "db/external_sst_file_ingestion_job.h"
#include "db/import_column_family_job.h"
#include "db/flush_job.h" #include "db/flush_job.h"
#include "db/forward_iterator.h" #include "db/forward_iterator.h"
#include "db/job_context.h" #include "db/job_context.h"
@ -3894,6 +3895,127 @@ Status DBImpl::IngestExternalFiles(
return status; return status;
} }
Status DBImpl::CreateColumnFamilyWithImport(
const ColumnFamilyOptions& options, const std::string& column_family_name,
const ImportColumnFamilyOptions& import_options,
const ExportImportFilesMetaData& metadata,
ColumnFamilyHandle** handle) {
assert(handle != nullptr);
assert(*handle == nullptr);
std::string cf_comparator_name = options.comparator->Name();
if (cf_comparator_name != metadata.db_comparator_name) {
return Status::InvalidArgument("Comparator name mismatch");
}
// Create column family.
auto status = CreateColumnFamily(options, column_family_name, handle);
if (!status.ok()) {
return status;
}
// Import sst files from metadata.
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(*handle);
auto cfd = cfh->cfd();
ImportColumnFamilyJob import_job(env_, versions_.get(), cfd,
immutable_db_options_, env_options_,
import_options, metadata.files);
SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
VersionEdit dummy_edit;
uint64_t next_file_number = 0;
std::list<uint64_t>::iterator pending_output_elem;
{
// Lock db mutex
InstrumentedMutexLock l(&mutex_);
if (error_handler_.IsDBStopped()) {
// Don't import files when there is a bg_error
status = error_handler_.GetBGError();
}
// Make sure that bg cleanup wont delete the files that we are importing
pending_output_elem = CaptureCurrentFileNumberInPendingOutputs();
if (status.ok()) {
// If crash happen after a hard link established, Recover function may
// reuse the file number that has already assigned to the internal file,
// and this will overwrite the external file. To protect the external
// file, we have to make sure the file number will never being reused.
next_file_number =
versions_->FetchAddFileNumber(metadata.files.size());
auto cf_options = cfd->GetLatestMutableCFOptions();
status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
}
}
}
dummy_sv_ctx.Clean();
if (status.ok()) {
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
status = import_job.Prepare(next_file_number, sv);
CleanupSuperVersion(sv);
}
if (status.ok()) {
SuperVersionContext sv_context(true /*create_superversion*/);
{
// Lock db mutex
InstrumentedMutexLock l(&mutex_);
// Stop writes to the DB by entering both write threads
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
num_running_ingest_file_++;
assert(!cfd->IsDropped());
status = import_job.Run();
// Install job edit [Mutex will be unlocked here]
if (status.ok()) {
auto cf_options = cfd->GetLatestMutableCFOptions();
status = versions_->LogAndApply(cfd, *cf_options, import_job.edit(),
&mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &sv_context, *cf_options);
}
}
// Resume writes to the DB
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
write_thread_.ExitUnbatched(&w);
num_running_ingest_file_--;
if (num_running_ingest_file_ == 0) {
bg_cv_.SignalAll();
}
}
// mutex_ is unlocked here
sv_context.Clean();
}
{
InstrumentedMutexLock l(&mutex_);
ReleaseFileNumberFromPendingOutputs(pending_output_elem);
}
import_job.Cleanup(status);
if (!status.ok()) {
DropColumnFamily(*handle);
DestroyColumnFamilyHandle(*handle);
*handle = nullptr;
}
return status;
}
Status DBImpl::VerifyChecksum() { Status DBImpl::VerifyChecksum() {
Status s; Status s;
std::vector<ColumnFamilyData*> cfd_list; std::vector<ColumnFamilyData*> cfd_list;

@ -27,6 +27,7 @@
#include "db/external_sst_file_ingestion_job.h" #include "db/external_sst_file_ingestion_job.h"
#include "db/flush_job.h" #include "db/flush_job.h"
#include "db/flush_scheduler.h" #include "db/flush_scheduler.h"
#include "db/import_column_family_job.h"
#include "db/internal_stats.h" #include "db/internal_stats.h"
#include "db/log_writer.h" #include "db/log_writer.h"
#include "db/logs_with_prep_tracker.h" #include "db/logs_with_prep_tracker.h"
@ -356,6 +357,13 @@ class DBImpl : public DB {
virtual Status IngestExternalFiles( virtual Status IngestExternalFiles(
const std::vector<IngestExternalFileArg>& args) override; const std::vector<IngestExternalFileArg>& args) override;
using DB::CreateColumnFamilyWithImport;
virtual Status CreateColumnFamilyWithImport(
const ColumnFamilyOptions& options, const std::string& column_family_name,
const ImportColumnFamilyOptions& import_options,
const ExportImportFilesMetaData& metadata,
ColumnFamilyHandle** handle) override;
virtual Status VerifyChecksum() override; virtual Status VerifyChecksum() override;
using DB::StartTrace; using DB::StartTrace;
@ -1803,7 +1811,8 @@ class DBImpl : public DB {
std::string db_absolute_path_; std::string db_absolute_path_;
// Number of running IngestExternalFile() calls. // Number of running IngestExternalFile() or CreateColumnFamilyWithImport()
// calls.
// REQUIRES: mutex held // REQUIRES: mutex held
int num_running_ingest_file_; int num_running_ingest_file_;

@ -115,6 +115,16 @@ class DBImplReadOnly : public DBImpl {
return Status::NotSupported("Not supported operation in read only mode."); return Status::NotSupported("Not supported operation in read only mode.");
} }
using DB::CreateColumnFamilyWithImport;
virtual Status CreateColumnFamilyWithImport(
const ColumnFamilyOptions& /*options*/,
const std::string& /*column_family_name*/,
const ImportColumnFamilyOptions& /*import_options*/,
const ExportImportFilesMetaData& /*metadata*/,
ColumnFamilyHandle** /*handle*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
}
private: private:
friend class DB; friend class DB;

@ -2492,6 +2492,16 @@ class ModelDB : public DB {
return Status::NotSupported("Not implemented"); return Status::NotSupported("Not implemented");
} }
using DB::CreateColumnFamilyWithImport;
virtual Status CreateColumnFamilyWithImport(
const ColumnFamilyOptions& /*options*/,
const std::string& /*column_family_name*/,
const ImportColumnFamilyOptions& /*import_options*/,
const ExportImportFilesMetaData& /*metadata*/,
ColumnFamilyHandle** /*handle*/) override {
return Status::NotSupported("Not implemented.");
}
Status VerifyChecksum() override { Status VerifyChecksum() override {
return Status::NotSupported("Not implemented."); return Status::NotSupported("Not implemented.");
} }

@ -0,0 +1,257 @@
#ifndef ROCKSDB_LITE
#include "db/import_column_family_job.h"
#include <cinttypes>
#include <algorithm>
#include <string>
#include <vector>
#include "db/version_edit.h"
#include "file/file_util.h"
#include "table/merging_iterator.h"
#include "table/scoped_arena_iterator.h"
#include "table/sst_file_writer_collectors.h"
#include "table/table_builder.h"
#include "util/file_reader_writer.h"
#include "util/stop_watch.h"
namespace rocksdb {
Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number,
SuperVersion* sv) {
Status status;
// Read the information of files we are importing
for (const auto& file_metadata : metadata_) {
const auto file_path = file_metadata.db_path + "/" + file_metadata.name;
IngestedFileInfo file_to_import;
status = GetIngestedFileInfo(file_path, &file_to_import, sv);
if (!status.ok()) {
return status;
}
files_to_import_.push_back(file_to_import);
}
const auto ucmp = cfd_->internal_comparator().user_comparator();
auto num_files = files_to_import_.size();
if (num_files == 0) {
return Status::InvalidArgument("The list of files is empty");
} else if (num_files > 1) {
// Verify that passed files don't have overlapping ranges in any particular
// level.
int min_level = 1; // Check for overlaps in Level 1 and above.
int max_level = -1;
for (const auto& file_metadata : metadata_) {
if (file_metadata.level > max_level) {
max_level = file_metadata.level;
}
}
for (int level = min_level; level <= max_level; ++level) {
autovector<const IngestedFileInfo*> sorted_files;
for (size_t i = 0; i < num_files; i++) {
if (metadata_[i].level == level) {
sorted_files.push_back(&files_to_import_[i]);
}
}
std::sort(sorted_files.begin(), sorted_files.end(),
[&ucmp](const IngestedFileInfo* info1,
const IngestedFileInfo* info2) {
return ucmp->Compare(info1->smallest_user_key,
info2->smallest_user_key) < 0;
});
for (size_t i = 0; i < sorted_files.size() - 1; i++) {
if (ucmp->Compare(sorted_files[i]->largest_user_key,
sorted_files[i + 1]->smallest_user_key) >= 0) {
return Status::InvalidArgument("Files have overlapping ranges");
}
}
}
}
for (const auto& f : files_to_import_) {
if (f.num_entries == 0) {
return Status::InvalidArgument("File contain no entries");
}
if (!f.smallest_internal_key().Valid() ||
!f.largest_internal_key().Valid()) {
return Status::Corruption("File has corrupted keys");
}
}
// Copy/Move external files into DB
auto hardlink_files = import_options_.move_files;
for (auto& f : files_to_import_) {
f.fd = FileDescriptor(next_file_number++, 0, f.file_size);
const auto path_outside_db = f.external_file_path;
const auto path_inside_db = TableFileName(
cfd_->ioptions()->cf_paths, f.fd.GetNumber(), f.fd.GetPathId());
if (hardlink_files) {
status = env_->LinkFile(path_outside_db, path_inside_db);
if (status.IsNotSupported()) {
// Original file is on a different FS, use copy instead of hard linking
hardlink_files = false;
}
}
if (!hardlink_files) {
status = CopyFile(env_, path_outside_db, path_inside_db, 0,
db_options_.use_fsync);
}
if (!status.ok()) {
break;
}
f.copy_file = !hardlink_files;
f.internal_file_path = path_inside_db;
}
if (!status.ok()) {
// We failed, remove all files that we copied into the db
for (const auto& f : files_to_import_) {
if (f.internal_file_path.empty()) {
break;
}
const auto s = env_->DeleteFile(f.internal_file_path);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"AddFile() clean up for file %s failed : %s",
f.internal_file_path.c_str(), s.ToString().c_str());
}
}
}
return status;
}
// REQUIRES: we have become the only writer by entering both write_thread_ and
// nonmem_write_thread_
Status ImportColumnFamilyJob::Run() {
Status status;
edit_.SetColumnFamily(cfd_->GetID());
for (size_t i = 0; i < files_to_import_.size(); ++i) {
const auto& f = files_to_import_[i];
const auto& file_metadata = metadata_[i];
edit_.AddFile(file_metadata.level, f.fd.GetNumber(), f.fd.GetPathId(),
f.fd.GetFileSize(), f.smallest_internal_key(),
f.largest_internal_key(), file_metadata.smallest_seqno,
file_metadata.largest_seqno, false);
// If incoming sequence number is higher, update local sequence number.
if (file_metadata.largest_seqno > versions_->LastSequence()) {
versions_->SetLastAllocatedSequence(file_metadata.largest_seqno);
versions_->SetLastPublishedSequence(file_metadata.largest_seqno);
versions_->SetLastSequence(file_metadata.largest_seqno);
}
}
return status;
}
void ImportColumnFamilyJob::Cleanup(const Status& status) {
if (!status.ok()) {
// We failed to add files to the database remove all the files we copied.
for (const auto& f : files_to_import_) {
const auto s = env_->DeleteFile(f.internal_file_path);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"AddFile() clean up for file %s failed : %s",
f.internal_file_path.c_str(), s.ToString().c_str());
}
}
} else if (status.ok() && import_options_.move_files) {
// The files were moved and added successfully, remove original file links
for (IngestedFileInfo& f : files_to_import_) {
const auto s = env_->DeleteFile(f.external_file_path);
if (!s.ok()) {
ROCKS_LOG_WARN(
db_options_.info_log,
"%s was added to DB successfully but failed to remove original "
"file link : %s",
f.external_file_path.c_str(), s.ToString().c_str());
}
}
}
}
Status ImportColumnFamilyJob::GetIngestedFileInfo(
const std::string& external_file, IngestedFileInfo* file_to_import,
SuperVersion* sv) {
file_to_import->external_file_path = external_file;
// Get external file size
auto status = env_->GetFileSize(external_file, &file_to_import->file_size);
if (!status.ok()) {
return status;
}
// Create TableReader for external file
std::unique_ptr<TableReader> table_reader;
std::unique_ptr<RandomAccessFile> sst_file;
std::unique_ptr<RandomAccessFileReader> sst_file_reader;
status = env_->NewRandomAccessFile(external_file, &sst_file, env_options_);
if (!status.ok()) {
return status;
}
sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file),
external_file));
status = cfd_->ioptions()->table_factory->NewTableReader(
TableReaderOptions(*cfd_->ioptions(),
sv->mutable_cf_options.prefix_extractor.get(),
env_options_, cfd_->internal_comparator()),
std::move(sst_file_reader), file_to_import->file_size, &table_reader);
if (!status.ok()) {
return status;
}
// Get the external file properties
auto props = table_reader->GetTableProperties();
// Set original_seqno to 0.
file_to_import->original_seqno = 0;
// Get number of entries in table
file_to_import->num_entries = props->num_entries;
ParsedInternalKey key;
ReadOptions ro;
// During reading the external file we can cache blocks that we read into
// the block cache, if we later change the global seqno of this file, we will
// have block in cache that will include keys with wrong seqno.
// We need to disable fill_cache so that we read from the file without
// updating the block cache.
ro.fill_cache = false;
std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));
// Get first (smallest) key from file
iter->SeekToFirst();
if (!ParseInternalKey(iter->key(), &key)) {
return Status::Corruption("external file have corrupted keys");
}
file_to_import->smallest_user_key = key.user_key.ToString();
// Get last (largest) key from file
iter->SeekToLast();
if (!ParseInternalKey(iter->key(), &key)) {
return Status::Corruption("external file have corrupted keys");
}
file_to_import->largest_user_key = key.user_key.ToString();
file_to_import->cf_id = static_cast<uint32_t>(props->column_family_id);
file_to_import->table_properties = *props;
return status;
}
} // namespace rocksdb
#endif // !ROCKSDB_LITE

@ -0,0 +1,70 @@
#pragma once
#include <string>
#include <unordered_set>
#include <vector>
#include "db/column_family.h"
#include "db/dbformat.h"
#include "db/external_sst_file_ingestion_job.h"
#include "db/snapshot_impl.h"
#include "options/db_options.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/metadata.h"
#include "rocksdb/sst_file_writer.h"
#include "util/autovector.h"
namespace rocksdb {
// Imports a set of sst files as is into a new column family. Logic is similar
// to ExternalSstFileIngestionJob.
class ImportColumnFamilyJob {
public:
ImportColumnFamilyJob(
Env* env, VersionSet* versions, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options, const EnvOptions& env_options,
const ImportColumnFamilyOptions& import_options,
const std::vector<LiveFileMetaData>& metadata)
: env_(env),
versions_(versions),
cfd_(cfd),
db_options_(db_options),
env_options_(env_options),
import_options_(import_options),
metadata_(metadata) {}
// Prepare the job by copying external files into the DB.
Status Prepare(uint64_t next_file_number, SuperVersion* sv);
// Will execute the import job and prepare edit() to be applied.
// REQUIRES: Mutex held
Status Run();
// Cleanup after successful/failed job
void Cleanup(const Status& status);
VersionEdit* edit() { return &edit_; }
const autovector<IngestedFileInfo>& files_to_import() const {
return files_to_import_;
}
private:
// Open the external file and populate `file_to_import` with all the
// external information we need to import this file.
Status GetIngestedFileInfo(const std::string& external_file,
IngestedFileInfo* file_to_import,
SuperVersion* sv);
Env* env_;
VersionSet* versions_;
ColumnFamilyData* cfd_;
const ImmutableDBOptions& db_options_;
const EnvOptions& env_options_;
autovector<IngestedFileInfo> files_to_import_;
VersionEdit edit_;
const ImportColumnFamilyOptions& import_options_;
std::vector<LiveFileMetaData> metadata_;
};
} // namespace rocksdb

@ -0,0 +1,565 @@
#ifndef ROCKSDB_LITE
#include <functional>
#include "db/db_test_util.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/sst_file_writer.h"
#include "test_util/testutil.h"
namespace rocksdb {
class ImportColumnFamilyTest : public DBTestBase {
public:
ImportColumnFamilyTest() : DBTestBase("/import_column_family_test") {
sst_files_dir_ = dbname_ + "/sst_files/";
DestroyAndRecreateExternalSSTFilesDir();
export_files_dir_ = test::TmpDir(env_) + "/export";
import_cfh_ = nullptr;
import_cfh2_ = nullptr;
metadata_ptr_ = nullptr;
}
~ImportColumnFamilyTest() {
if (import_cfh_) {
db_->DropColumnFamily(import_cfh_);
db_->DestroyColumnFamilyHandle(import_cfh_);
import_cfh_ = nullptr;
}
if (import_cfh2_) {
db_->DropColumnFamily(import_cfh2_);
db_->DestroyColumnFamilyHandle(import_cfh2_);
import_cfh2_ = nullptr;
}
if (metadata_ptr_) {
delete metadata_ptr_;
metadata_ptr_ = nullptr;
}
test::DestroyDir(env_, sst_files_dir_);
test::DestroyDir(env_, export_files_dir_);
}
void DestroyAndRecreateExternalSSTFilesDir() {
test::DestroyDir(env_, sst_files_dir_);
env_->CreateDir(sst_files_dir_);
test::DestroyDir(env_, export_files_dir_);
}
LiveFileMetaData LiveFileMetaDataInit(std::string name,
std::string path,
int level,
SequenceNumber smallest_seqno,
SequenceNumber largest_seqno) {
LiveFileMetaData metadata;
metadata.name = name;
metadata.db_path = path;
metadata.smallest_seqno = smallest_seqno;
metadata.largest_seqno = largest_seqno;
metadata.level = level;
return metadata;
}
protected:
std::string sst_files_dir_;
std::string export_files_dir_;
ColumnFamilyHandle* import_cfh_;
ColumnFamilyHandle* import_cfh2_;
ExportImportFilesMetaData *metadata_ptr_;
};
TEST_F(ImportColumnFamilyTest, ImportSSTFileWriterFiles) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"koko"}, options);
SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]);
SstFileWriter sfw_unknown(EnvOptions(), options);
// cf1.sst
const std::string cf1_sst_name = "cf1.sst";
const std::string cf1_sst = sst_files_dir_ + cf1_sst_name;
ASSERT_OK(sfw_cf1.Open(cf1_sst));
ASSERT_OK(sfw_cf1.Put("K1", "V1"));
ASSERT_OK(sfw_cf1.Put("K2", "V2"));
ASSERT_OK(sfw_cf1.Finish());
// cf_unknown.sst
const std::string unknown_sst_name = "cf_unknown.sst";
const std::string unknown_sst = sst_files_dir_ + unknown_sst_name;
ASSERT_OK(sfw_unknown.Open(unknown_sst));
ASSERT_OK(sfw_unknown.Put("K3", "V1"));
ASSERT_OK(sfw_unknown.Put("K4", "V2"));
ASSERT_OK(sfw_unknown.Finish());
{
// Import sst file corresponding to cf1 onto a new cf and verify
ExportImportFilesMetaData metadata;
metadata.files.push_back(
LiveFileMetaDataInit(cf1_sst_name, sst_files_dir_, 0, 10, 19));
metadata.db_comparator_name = options.comparator->Name();
ASSERT_OK(db_->CreateColumnFamilyWithImport(
options, "toto", ImportColumnFamilyOptions(), metadata, &import_cfh_));
ASSERT_NE(import_cfh_, nullptr);
std::string value;
db_->Get(ReadOptions(), import_cfh_, "K1", &value);
ASSERT_EQ(value, "V1");
db_->Get(ReadOptions(), import_cfh_, "K2", &value);
ASSERT_EQ(value, "V2");
ASSERT_OK(db_->DropColumnFamily(import_cfh_));
ASSERT_OK(db_->DestroyColumnFamilyHandle(import_cfh_));
import_cfh_ = nullptr;
}
{
// Import sst file corresponding to unknown cf onto a new cf and verify
ExportImportFilesMetaData metadata;
metadata.files.push_back(
LiveFileMetaDataInit(unknown_sst_name, sst_files_dir_, 0, 20, 29));
metadata.db_comparator_name = options.comparator->Name();
ASSERT_OK(db_->CreateColumnFamilyWithImport(
options, "yoyo", ImportColumnFamilyOptions(), metadata, &import_cfh_));
ASSERT_NE(import_cfh_, nullptr);
std::string value;
db_->Get(ReadOptions(), import_cfh_, "K3", &value);
ASSERT_EQ(value, "V1");
db_->Get(ReadOptions(), import_cfh_, "K4", &value);
ASSERT_EQ(value, "V2");
}
}
TEST_F(ImportColumnFamilyTest, ImportSSTFileWriterFilesWithOverlap) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"koko"}, options);
SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]);
// file3.sst
const std::string file3_sst_name = "file3.sst";
const std::string file3_sst = sst_files_dir_ + file3_sst_name;
ASSERT_OK(sfw_cf1.Open(file3_sst));
for (int i = 0; i < 100; ++i) {
sfw_cf1.Put(Key(i), Key(i) + "_val");
}
ASSERT_OK(sfw_cf1.Finish());
// file2.sst
const std::string file2_sst_name = "file2.sst";
const std::string file2_sst = sst_files_dir_ + file2_sst_name;
ASSERT_OK(sfw_cf1.Open(file2_sst));
for (int i = 0; i < 100; i += 2) {
sfw_cf1.Put(Key(i), Key(i) + "_overwrite1");
}
ASSERT_OK(sfw_cf1.Finish());
// file1a.sst
const std::string file1a_sst_name = "file1a.sst";
const std::string file1a_sst = sst_files_dir_ + file1a_sst_name;
ASSERT_OK(sfw_cf1.Open(file1a_sst));
for (int i = 0; i < 52; i += 4) {
sfw_cf1.Put(Key(i), Key(i) + "_overwrite2");
}
ASSERT_OK(sfw_cf1.Finish());
// file1b.sst
const std::string file1b_sst_name = "file1b.sst";
const std::string file1b_sst = sst_files_dir_ + file1b_sst_name;
ASSERT_OK(sfw_cf1.Open(file1b_sst));
for (int i = 52; i < 100; i += 4) {
sfw_cf1.Put(Key(i), Key(i) + "_overwrite2");
}
ASSERT_OK(sfw_cf1.Finish());
// file0a.sst
const std::string file0a_sst_name = "file0a.sst";
const std::string file0a_sst = sst_files_dir_ + file0a_sst_name;
ASSERT_OK(sfw_cf1.Open(file0a_sst));
for (int i = 0; i < 100; i += 16) {
sfw_cf1.Put(Key(i), Key(i) + "_overwrite3");
}
ASSERT_OK(sfw_cf1.Finish());
// file0b.sst
const std::string file0b_sst_name = "file0b.sst";
const std::string file0b_sst = sst_files_dir_ + file0b_sst_name;
ASSERT_OK(sfw_cf1.Open(file0b_sst));
for (int i = 0; i < 100; i += 16) {
sfw_cf1.Put(Key(i), Key(i) + "_overwrite4");
}
ASSERT_OK(sfw_cf1.Finish());
// Import sst files and verify
ExportImportFilesMetaData metadata;
metadata.files.push_back(
LiveFileMetaDataInit(file3_sst_name, sst_files_dir_, 3, 10, 19));
metadata.files.push_back(
LiveFileMetaDataInit(file2_sst_name, sst_files_dir_, 2, 20, 29));
metadata.files.push_back(
LiveFileMetaDataInit(file1a_sst_name, sst_files_dir_, 1, 30, 34));
metadata.files.push_back(
LiveFileMetaDataInit(file1b_sst_name, sst_files_dir_, 1, 35, 39));
metadata.files.push_back(
LiveFileMetaDataInit(file0a_sst_name, sst_files_dir_, 0, 40, 49));
metadata.files.push_back(
LiveFileMetaDataInit(file0b_sst_name, sst_files_dir_, 0, 50, 59));
metadata.db_comparator_name = options.comparator->Name();
ASSERT_OK(db_->CreateColumnFamilyWithImport(
options, "toto", ImportColumnFamilyOptions(), metadata, &import_cfh_));
ASSERT_NE(import_cfh_, nullptr);
for (int i = 0; i < 100; i++) {
std::string value;
db_->Get(ReadOptions(), import_cfh_, Key(i), &value);
if (i % 16 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite4");
} else if (i % 4 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite2");
} else if (i % 2 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite1");
} else {
ASSERT_EQ(value, Key(i) + "_val");
}
}
for (int i = 0; i < 100; i += 5) {
ASSERT_OK(
db_->Put(WriteOptions(), import_cfh_, Key(i), Key(i) + "_overwrite5"));
}
// Flush and check again
ASSERT_OK(db_->Flush(FlushOptions(), import_cfh_));
for (int i = 0; i < 100; i++) {
std::string value;
db_->Get(ReadOptions(), import_cfh_, Key(i), &value);
if (i % 5 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite5");
} else if (i % 16 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite4");
} else if (i % 4 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite2");
} else if (i % 2 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite1");
} else {
ASSERT_EQ(value, Key(i) + "_val");
}
}
// Compact and check again.
ASSERT_OK(
db_->CompactRange(CompactRangeOptions(), import_cfh_, nullptr, nullptr));
for (int i = 0; i < 100; i++) {
std::string value;
db_->Get(ReadOptions(), import_cfh_, Key(i), &value);
if (i % 5 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite5");
} else if (i % 16 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite4");
} else if (i % 4 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite2");
} else if (i % 2 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite1");
} else {
ASSERT_EQ(value, Key(i) + "_val");
}
}
}
TEST_F(ImportColumnFamilyTest, ImportExportedSSTFromAnotherCF) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"koko"}, options);
for (int i = 0; i < 100; ++i) {
Put(1, Key(i), Key(i) + "_val");
}
ASSERT_OK(Flush(1));
ASSERT_OK(
db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr));
// Overwrite the value in the same set of keys.
for (int i = 0; i < 100; ++i) {
Put(1, Key(i), Key(i) + "_overwrite");
}
// Flush to create L0 file.
ASSERT_OK(Flush(1));
for (int i = 0; i < 100; ++i) {
Put(1, Key(i), Key(i) + "_overwrite2");
}
// Flush again to create another L0 file. It should have higher sequencer.
ASSERT_OK(Flush(1));
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->ExportColumnFamily(handles_[1], export_files_dir_,
&metadata_ptr_));
ASSERT_NE(metadata_ptr_, nullptr);
ImportColumnFamilyOptions import_options;
import_options.move_files = false;
ASSERT_OK(db_->CreateColumnFamilyWithImport(options, "toto", import_options,
*metadata_ptr_, &import_cfh_));
ASSERT_NE(import_cfh_, nullptr);
import_options.move_files = true;
ASSERT_OK(db_->CreateColumnFamilyWithImport(options, "yoyo", import_options,
*metadata_ptr_, &import_cfh2_));
ASSERT_NE(import_cfh2_, nullptr);
delete metadata_ptr_;
metadata_ptr_ = NULL;
std::string value1, value2;
for (int i = 0; i < 100; ++i) {
db_->Get(ReadOptions(), import_cfh_, Key(i), &value1);
ASSERT_EQ(Get(1, Key(i)), value1);
}
for (int i = 0; i < 100; ++i) {
db_->Get(ReadOptions(), import_cfh2_, Key(i), &value2);
ASSERT_EQ(Get(1, Key(i)), value2);
}
// Modify keys in cf1 and verify.
for (int i = 0; i < 25; i++) {
ASSERT_OK(db_->Delete(WriteOptions(), import_cfh_, Key(i)));
}
for (int i = 25; i < 50; i++) {
ASSERT_OK(
db_->Put(WriteOptions(), import_cfh_, Key(i), Key(i) + "_overwrite3"));
}
for (int i = 0; i < 25; ++i) {
ASSERT_TRUE(
db_->Get(ReadOptions(), import_cfh_, Key(i), &value1).IsNotFound());
}
for (int i = 25; i < 50; ++i) {
db_->Get(ReadOptions(), import_cfh_, Key(i), &value1);
ASSERT_EQ(Key(i) + "_overwrite3", value1);
}
for (int i = 50; i < 100; ++i) {
db_->Get(ReadOptions(), import_cfh_, Key(i), &value1);
ASSERT_EQ(Key(i) + "_overwrite2", value1);
}
for (int i = 0; i < 100; ++i) {
db_->Get(ReadOptions(), import_cfh2_, Key(i), &value2);
ASSERT_EQ(Get(1, Key(i)), value2);
}
// Compact and check again.
ASSERT_OK(db_->Flush(FlushOptions(), import_cfh_));
ASSERT_OK(
db_->CompactRange(CompactRangeOptions(), import_cfh_, nullptr, nullptr));
for (int i = 0; i < 25; ++i) {
ASSERT_TRUE(
db_->Get(ReadOptions(), import_cfh_, Key(i), &value1).IsNotFound());
}
for (int i = 25; i < 50; ++i) {
db_->Get(ReadOptions(), import_cfh_, Key(i), &value1);
ASSERT_EQ(Key(i) + "_overwrite3", value1);
}
for (int i = 50; i < 100; ++i) {
db_->Get(ReadOptions(), import_cfh_, Key(i), &value1);
ASSERT_EQ(Key(i) + "_overwrite2", value1);
}
for (int i = 0; i < 100; ++i) {
db_->Get(ReadOptions(), import_cfh2_, Key(i), &value2);
ASSERT_EQ(Get(1, Key(i)), value2);
}
}
TEST_F(ImportColumnFamilyTest, ImportExportedSSTFromAnotherDB) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"koko"}, options);
for (int i = 0; i < 100; ++i) {
Put(1, Key(i), Key(i) + "_val");
}
ASSERT_OK(Flush(1));
// Compact to create a L1 file.
ASSERT_OK(
db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr));
// Overwrite the value in the same set of keys.
for (int i = 0; i < 50; ++i) {
Put(1, Key(i), Key(i) + "_overwrite");
}
// Flush to create L0 file.
ASSERT_OK(Flush(1));
for (int i = 0; i < 25; ++i) {
Put(1, Key(i), Key(i) + "_overwrite2");
}
// Flush again to create another L0 file. It should have higher sequencer.
ASSERT_OK(Flush(1));
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->ExportColumnFamily(handles_[1], export_files_dir_,
&metadata_ptr_));
ASSERT_NE(metadata_ptr_, nullptr);
// Create a new db and import the files.
DB* db_copy;
test::DestroyDir(env_, dbname_ + "/db_copy");
ASSERT_OK(DB::Open(options, dbname_ + "/db_copy", &db_copy));
ColumnFamilyHandle* cfh = nullptr;
ASSERT_OK(db_copy->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo",
ImportColumnFamilyOptions(),
*metadata_ptr_, &cfh));
ASSERT_NE(cfh, nullptr);
for (int i = 0; i < 100; ++i) {
std::string value;
db_copy->Get(ReadOptions(), cfh, Key(i), &value);
ASSERT_EQ(Get(1, Key(i)), value);
}
db_copy->DropColumnFamily(cfh);
test::DestroyDir(env_, dbname_ + "/db_copy");
}
TEST_F(ImportColumnFamilyTest, ImportColumnFamilyNegativeTest) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"koko"}, options);
{
// Create column family with existing cf name.
ExportImportFilesMetaData metadata;
ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "koko",
ImportColumnFamilyOptions(),
metadata, &import_cfh_),
Status::InvalidArgument("Column family already exists"));
ASSERT_EQ(import_cfh_, nullptr);
}
{
// Import with no files specified.
ExportImportFilesMetaData metadata;
ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo",
ImportColumnFamilyOptions(),
metadata, &import_cfh_),
Status::InvalidArgument("The list of files is empty"));
ASSERT_EQ(import_cfh_, nullptr);
}
{
// Import with overlapping keys in sst files.
ExportImportFilesMetaData metadata;
SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]);
const std::string file1_sst_name = "file1.sst";
const std::string file1_sst = sst_files_dir_ + file1_sst_name;
ASSERT_OK(sfw_cf1.Open(file1_sst));
ASSERT_OK(sfw_cf1.Put("K1", "V1"));
ASSERT_OK(sfw_cf1.Put("K2", "V2"));
ASSERT_OK(sfw_cf1.Finish());
const std::string file2_sst_name = "file2.sst";
const std::string file2_sst = sst_files_dir_ + file2_sst_name;
ASSERT_OK(sfw_cf1.Open(file2_sst));
ASSERT_OK(sfw_cf1.Put("K2", "V2"));
ASSERT_OK(sfw_cf1.Put("K3", "V3"));
ASSERT_OK(sfw_cf1.Finish());
metadata.files.push_back(
LiveFileMetaDataInit(file1_sst_name, sst_files_dir_, 1, 10, 19));
metadata.files.push_back(
LiveFileMetaDataInit(file2_sst_name, sst_files_dir_, 1, 10, 19));
metadata.db_comparator_name = options.comparator->Name();
ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo",
ImportColumnFamilyOptions(),
metadata, &import_cfh_),
Status::InvalidArgument("Files have overlapping ranges"));
ASSERT_EQ(import_cfh_, nullptr);
}
{
// Import with a mismatching comparator, should fail with appropriate error.
ExportImportFilesMetaData metadata;
Options mismatch_options = CurrentOptions();
mismatch_options.comparator = ReverseBytewiseComparator();
SstFileWriter sfw_cf1(EnvOptions(), mismatch_options, handles_[1]);
const std::string file1_sst_name = "file1.sst";
const std::string file1_sst = sst_files_dir_ + file1_sst_name;
ASSERT_OK(sfw_cf1.Open(file1_sst));
ASSERT_OK(sfw_cf1.Put("K2", "V2"));
ASSERT_OK(sfw_cf1.Put("K1", "V1"));
ASSERT_OK(sfw_cf1.Finish());
metadata.files.push_back(
LiveFileMetaDataInit(file1_sst_name, sst_files_dir_, 1, 10, 19));
metadata.db_comparator_name = mismatch_options.comparator->Name();
ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "coco",
ImportColumnFamilyOptions(),
metadata, &import_cfh_),
Status::InvalidArgument("Comparator name mismatch"));
ASSERT_EQ(import_cfh_, nullptr);
}
{
// Import with non existent sst file should fail with appropriate error
ExportImportFilesMetaData metadata;
SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]);
const std::string file1_sst_name = "file1.sst";
const std::string file1_sst = sst_files_dir_ + file1_sst_name;
ASSERT_OK(sfw_cf1.Open(file1_sst));
ASSERT_OK(sfw_cf1.Put("K1", "V1"));
ASSERT_OK(sfw_cf1.Put("K2", "V2"));
ASSERT_OK(sfw_cf1.Finish());
const std::string file3_sst_name = "file3.sst";
metadata.files.push_back(
LiveFileMetaDataInit(file1_sst_name, sst_files_dir_, 1, 10, 19));
metadata.files.push_back(
LiveFileMetaDataInit(file3_sst_name, sst_files_dir_, 1, 10, 19));
metadata.db_comparator_name = options.comparator->Name();
ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo",
ImportColumnFamilyOptions(),
metadata, &import_cfh_),
Status::IOError("No such file or directory"));
ASSERT_EQ(import_cfh_, nullptr);
// Test successful import after a failure with the same CF name. Ensures
// there is no side effect with CF when there is a failed import
metadata.files.pop_back();
metadata.db_comparator_name = options.comparator->Name();
ASSERT_OK(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo",
ImportColumnFamilyOptions(),
metadata, &import_cfh_));
ASSERT_NE(import_cfh_, nullptr);
}
}
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else
#include <stdio.h>
int main(int argc, char** argv) {
fprintf(stderr,
"SKIPPED as External SST File Writer and Import are not supported "
"in ROCKSDB_LITE\n");
return 0;
}
#endif // !ROCKSDB_LITE

@ -1174,6 +1174,27 @@ class DB {
virtual Status IngestExternalFiles( virtual Status IngestExternalFiles(
const std::vector<IngestExternalFileArg>& args) = 0; const std::vector<IngestExternalFileArg>& args) = 0;
// CreateColumnFamilyWithImport() will create a new column family with
// column_family_name and import external SST files specified in metadata into
// this column family.
// (1) External SST files can be created using SstFileWriter.
// (2) External SST files can be exported from a particular column family in
// an existing DB.
// Option in import_options specifies whether the external files are copied or
// moved (default is copy). When option specifies copy, managing files at
// external_file_path is caller's responsibility. When option specifies a
// move, the call ensures that the specified files at external_file_path are
// deleted on successful return and files are not modified on any error
// return.
// On error return, column family handle returned will be nullptr.
// ColumnFamily will be present on successful return and will not be present
// on error return. ColumnFamily may be present on any crash during this call.
virtual Status CreateColumnFamilyWithImport(
const ColumnFamilyOptions& options, const std::string& column_family_name,
const ImportColumnFamilyOptions& import_options,
const ExportImportFilesMetaData& metadata,
ColumnFamilyHandle** handle) = 0;
virtual Status VerifyChecksum() = 0; virtual Status VerifyChecksum() = 0;
// AddFile() is deprecated, please use IngestExternalFile() // AddFile() is deprecated, please use IngestExternalFile()

@ -108,4 +108,11 @@ struct LiveFileMetaData : SstFileMetaData {
int level; // Level at which this file resides. int level; // Level at which this file resides.
LiveFileMetaData() : column_family_name(), level(0) {} LiveFileMetaData() : column_family_name(), level(0) {}
}; };
// Metadata returned as output from ExportColumnFamily() and used as input to
// CreateColumnFamiliesWithImport().
struct ExportImportFilesMetaData {
std::string db_comparator_name; // Used to safety check at import.
std::vector<LiveFileMetaData> files; // Vector of file metadata.
};
} // namespace rocksdb } // namespace rocksdb

@ -1491,4 +1491,10 @@ struct TraceOptions {
uint64_t filter = kTraceFilterNone; uint64_t filter = kTraceFilterNone;
}; };
// ImportColumnFamilyOptions is used by ImportColumnFamily()
struct ImportColumnFamilyOptions {
// Can be set to true to move the files instead of copying them.
bool move_files = false;
};
} // namespace rocksdb } // namespace rocksdb

@ -9,11 +9,15 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include <string> #include <string>
#include <vector>
#include "rocksdb/status.h" #include "rocksdb/status.h"
namespace rocksdb { namespace rocksdb {
class DB; class DB;
class ColumnFamilyHandle;
struct LiveFileMetaData;
struct ExportImportFilesMetaData;
class Checkpoint { class Checkpoint {
public: public:
@ -36,6 +40,16 @@ class Checkpoint {
virtual Status CreateCheckpoint(const std::string& checkpoint_dir, virtual Status CreateCheckpoint(const std::string& checkpoint_dir,
uint64_t log_size_for_flush = 0); uint64_t log_size_for_flush = 0);
// Exports all live SST files of a specified Column Family onto export_dir,
// returning SST files information in metadata.
// - SST files will be created as hard links when the directory specified
// is in the same partition as the db directory, copied otherwise.
// - export_dir should not already exist and will be created by this API.
// - Always triggers a flush.
virtual Status ExportColumnFamily(ColumnFamilyHandle* handle,
const std::string& export_dir,
ExportImportFilesMetaData** metadata);
virtual ~Checkpoint() {} virtual ~Checkpoint() {}
}; };

@ -120,6 +120,16 @@ class StackableDB : public DB {
return db_->IngestExternalFiles(args); return db_->IngestExternalFiles(args);
} }
using DB::CreateColumnFamilyWithImport;
virtual Status CreateColumnFamilyWithImport(
const ColumnFamilyOptions& options, const std::string& column_family_name,
const ImportColumnFamilyOptions& import_options,
const ExportImportFilesMetaData& metadata,
ColumnFamilyHandle** handle) override {
return db_->CreateColumnFamilyWithImport(options, column_family_name,
import_options, metadata, handle);
}
virtual Status VerifyChecksum() override { return db_->VerifyChecksum(); } virtual Status VerifyChecksum() override { return db_->VerifyChecksum(); }
using DB::KeyMayExist; using DB::KeyMayExist;

@ -36,6 +36,7 @@ LIB_SOURCES = \
db/flush_job.cc \ db/flush_job.cc \
db/flush_scheduler.cc \ db/flush_scheduler.cc \
db/forward_iterator.cc \ db/forward_iterator.cc \
db/import_column_family_job.cc \
db/internal_stats.cc \ db/internal_stats.cc \
db/logs_with_prep_tracker.cc \ db/logs_with_prep_tracker.cc \
db/log_reader.cc \ db/log_reader.cc \

@ -22,6 +22,7 @@
#include "port/port.h" #include "port/port.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/metadata.h"
#include "rocksdb/transaction_log.h" #include "rocksdb/transaction_log.h"
#include "rocksdb/utilities/checkpoint.h" #include "rocksdb/utilities/checkpoint.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
@ -60,6 +61,12 @@ void CheckpointImpl::CleanStagingDirectory(
full_private_path.c_str(), s.ToString().c_str()); full_private_path.c_str(), s.ToString().c_str());
} }
Status Checkpoint::ExportColumnFamily(
ColumnFamilyHandle* /*handle*/, const std::string& /*export_dir*/,
ExportImportFilesMetaData** /*metadata*/) {
return Status::NotSupported("");
}
// Builds an openable snapshot of RocksDB // Builds an openable snapshot of RocksDB
Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
uint64_t log_size_for_flush) { uint64_t log_size_for_flush) {
@ -322,6 +329,184 @@ Status CheckpointImpl::CreateCustomCheckpoint(
return s; return s;
} }
// Exports all live SST files of a specified Column Family onto export_dir,
// returning SST files information in metadata.
Status CheckpointImpl::ExportColumnFamily(
ColumnFamilyHandle* handle, const std::string& export_dir,
ExportImportFilesMetaData** metadata) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(handle);
const auto cf_name = cfh->GetName();
const auto db_options = db_->GetDBOptions();
assert(metadata != nullptr);
assert(*metadata == nullptr);
auto s = db_->GetEnv()->FileExists(export_dir);
if (s.ok()) {
return Status::InvalidArgument("Specified export_dir exists");
} else if (!s.IsNotFound()) {
assert(s.IsIOError());
return s;
}
const auto final_nonslash_idx = export_dir.find_last_not_of('/');
if (final_nonslash_idx == std::string::npos) {
return Status::InvalidArgument("Specified export_dir invalid");
}
ROCKS_LOG_INFO(db_options.info_log,
"[%s] export column family onto export directory %s",
cf_name.c_str(), export_dir.c_str());
// Create a temporary export directory.
const auto tmp_export_dir =
export_dir.substr(0, final_nonslash_idx + 1) + ".tmp";
s = db_->GetEnv()->CreateDir(tmp_export_dir);
if (s.ok()) {
s = db_->Flush(rocksdb::FlushOptions(), handle);
}
ColumnFamilyMetaData db_metadata;
if (s.ok()) {
// Export live sst files with file deletions disabled.
s = db_->DisableFileDeletions();
if (s.ok()) {
db_->GetColumnFamilyMetaData(handle, &db_metadata);
s = ExportFilesInMetaData(
db_options, db_metadata,
[&](const std::string& src_dirname, const std::string& fname) {
ROCKS_LOG_INFO(db_options.info_log, "[%s] HardLinking %s",
cf_name.c_str(), fname.c_str());
return db_->GetEnv()->LinkFile(src_dirname + fname,
tmp_export_dir + fname);
} /*link_file_cb*/,
[&](const std::string& src_dirname, const std::string& fname) {
ROCKS_LOG_INFO(db_options.info_log, "[%s] Copying %s",
cf_name.c_str(), fname.c_str());
return CopyFile(db_->GetEnv(), src_dirname + fname,
tmp_export_dir + fname, 0, db_options.use_fsync);
} /*copy_file_cb*/);
const auto enable_status = db_->EnableFileDeletions(false /*force*/);
if (s.ok()) {
s = enable_status;
}
}
}
auto moved_to_user_specified_dir = false;
if (s.ok()) {
// Move temporary export directory to the actual export directory.
s = db_->GetEnv()->RenameFile(tmp_export_dir, export_dir);
}
if (s.ok()) {
// Fsync export directory.
moved_to_user_specified_dir = true;
std::unique_ptr<Directory> dir_ptr;
s = db_->GetEnv()->NewDirectory(export_dir, &dir_ptr);
if (s.ok()) {
assert(dir_ptr != nullptr);
s = dir_ptr->Fsync();
}
}
if (s.ok()) {
// Export of files succeeded. Fill in the metadata information.
auto result_metadata = new ExportImportFilesMetaData();
result_metadata->db_comparator_name = handle->GetComparator()->Name();
for (const auto& level_metadata : db_metadata.levels) {
for (const auto& file_metadata : level_metadata.files) {
LiveFileMetaData live_file_metadata;
live_file_metadata.size = file_metadata.size;
live_file_metadata.name = std::move(file_metadata.name);
live_file_metadata.db_path = export_dir;
live_file_metadata.smallest_seqno = file_metadata.smallest_seqno;
live_file_metadata.largest_seqno = file_metadata.largest_seqno;
live_file_metadata.smallestkey = std::move(file_metadata.smallestkey);
live_file_metadata.largestkey = std::move(file_metadata.largestkey);
live_file_metadata.level = level_metadata.level;
result_metadata->files.push_back(live_file_metadata);
}
*metadata = result_metadata;
}
ROCKS_LOG_INFO(db_options.info_log, "[%s] Export succeeded.",
cf_name.c_str());
} else {
// Failure: Clean up all the files/directories created.
ROCKS_LOG_INFO(db_options.info_log, "[%s] Export failed. %s",
cf_name.c_str(), s.ToString().c_str());
std::vector<std::string> subchildren;
const auto cleanup_dir =
moved_to_user_specified_dir ? export_dir : tmp_export_dir;
db_->GetEnv()->GetChildren(cleanup_dir, &subchildren);
for (const auto& subchild : subchildren) {
const auto subchild_path = cleanup_dir + "/" + subchild;
const auto status = db_->GetEnv()->DeleteFile(subchild_path);
if (!status.ok()) {
ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup file %s: %s",
subchild_path.c_str(), status.ToString().c_str());
}
}
const auto status = db_->GetEnv()->DeleteDir(cleanup_dir);
if (!status.ok()) {
ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup dir %s: %s",
cleanup_dir.c_str(), status.ToString().c_str());
}
}
return s;
}
Status CheckpointImpl::ExportFilesInMetaData(
const DBOptions& db_options, const ColumnFamilyMetaData& metadata,
std::function<Status(const std::string& src_dirname,
const std::string& src_fname)>
link_file_cb,
std::function<Status(const std::string& src_dirname,
const std::string& src_fname)>
copy_file_cb) {
Status s;
auto hardlink_file = true;
// Copy/hard link files in metadata.
size_t num_files = 0;
for (const auto& level_metadata : metadata.levels) {
for (const auto& file_metadata : level_metadata.files) {
uint64_t number;
FileType type;
const auto ok = ParseFileName(file_metadata.name, &number, &type);
if (!ok) {
s = Status::Corruption("Could not parse file name");
break;
}
// We should only get sst files here.
assert(type == kTableFile);
assert(file_metadata.size > 0 && file_metadata.name[0] == '/');
const auto src_fname = file_metadata.name;
++num_files;
if (hardlink_file) {
s = link_file_cb(db_->GetName(), src_fname);
if (num_files == 1 && s.IsNotSupported()) {
// Fallback to copy if link failed due to cross-device directories.
hardlink_file = false;
s = Status::OK();
}
}
if (!hardlink_file) {
s = copy_file_cb(db_->GetName(), src_fname);
}
if (!s.ok()) {
break;
}
}
}
ROCKS_LOG_INFO(db_options.info_log, "Number of table files %" ROCKSDB_PRIszt,
num_files);
return s;
}
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -30,6 +30,17 @@ class CheckpointImpl : public Checkpoint {
virtual Status CreateCheckpoint(const std::string& checkpoint_dir, virtual Status CreateCheckpoint(const std::string& checkpoint_dir,
uint64_t log_size_for_flush) override; uint64_t log_size_for_flush) override;
// Exports all live SST files of a specified Column Family onto export_dir
// and returning SST files information in metadata.
// - SST files will be created as hard links when the directory specified
// is in the same partition as the db directory, copied otherwise.
// - export_dir should not already exist and will be created by this API.
// - Always triggers a flush.
using Checkpoint::ExportColumnFamily;
virtual Status ExportColumnFamily(
ColumnFamilyHandle* handle, const std::string& export_dir,
ExportImportFilesMetaData** metadata) override;
// Checkpoint logic can be customized by providing callbacks for link, copy, // Checkpoint logic can be customized by providing callbacks for link, copy,
// or create. // or create.
Status CreateCustomCheckpoint( Status CreateCustomCheckpoint(
@ -48,6 +59,18 @@ class CheckpointImpl : public Checkpoint {
private: private:
void CleanStagingDirectory(const std::string& path, Logger* info_log); void CleanStagingDirectory(const std::string& path, Logger* info_log);
// Export logic customization by providing callbacks for link or copy.
Status ExportFilesInMetaData(
const DBOptions& db_options, const ColumnFamilyMetaData& metadata,
std::function<Status(const std::string& src_dirname,
const std::string& fname)>
link_file_cb,
std::function<Status(const std::string& src_dirname,
const std::string& fname)>
copy_file_cb);
private:
DB* db_; DB* db_;
}; };

@ -26,6 +26,7 @@
#include "test_util/fault_injection_test_env.h" #include "test_util/fault_injection_test_env.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h"
namespace rocksdb { namespace rocksdb {
class CheckpointTest : public testing::Test { class CheckpointTest : public testing::Test {
@ -44,6 +45,9 @@ class CheckpointTest : public testing::Test {
Options last_options_; Options last_options_;
std::vector<ColumnFamilyHandle*> handles_; std::vector<ColumnFamilyHandle*> handles_;
std::string snapshot_name_; std::string snapshot_name_;
std::string export_path_;
ColumnFamilyHandle* cfh_reverse_comp_;
ExportImportFilesMetaData* metadata_;
CheckpointTest() : env_(Env::Default()) { CheckpointTest() : env_(Env::Default()) {
env_->SetBackgroundThreads(1, Env::LOW); env_->SetBackgroundThreads(1, Env::LOW);
@ -64,12 +68,24 @@ class CheckpointTest : public testing::Test {
EXPECT_OK(DestroyDB(snapshot_tmp_name, options)); EXPECT_OK(DestroyDB(snapshot_tmp_name, options));
env_->DeleteDir(snapshot_tmp_name); env_->DeleteDir(snapshot_tmp_name);
Reopen(options); Reopen(options);
export_path_ = test::TmpDir(env_) + "/export";
test::DestroyDir(env_, export_path_);
cfh_reverse_comp_ = nullptr;
metadata_ = nullptr;
} }
~CheckpointTest() override { ~CheckpointTest() override {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->LoadDependency({}); rocksdb::SyncPoint::GetInstance()->LoadDependency({});
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
if (cfh_reverse_comp_) {
EXPECT_OK(db_->DestroyColumnFamilyHandle(cfh_reverse_comp_));
cfh_reverse_comp_ = nullptr;
}
if (metadata_) {
delete metadata_;
metadata_ = nullptr;
}
Close(); Close();
Options options; Options options;
options.db_paths.emplace_back(dbname_, 0); options.db_paths.emplace_back(dbname_, 0);
@ -78,6 +94,7 @@ class CheckpointTest : public testing::Test {
options.db_paths.emplace_back(dbname_ + "_4", 0); options.db_paths.emplace_back(dbname_ + "_4", 0);
EXPECT_OK(DestroyDB(dbname_, options)); EXPECT_OK(DestroyDB(dbname_, options));
EXPECT_OK(DestroyDB(snapshot_name_, options)); EXPECT_OK(DestroyDB(snapshot_name_, options));
test::DestroyDir(env_, export_path_);
} }
// Return the current option configuration. // Return the current option configuration.
@ -140,6 +157,12 @@ class CheckpointTest : public testing::Test {
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
} }
void CompactAll() {
for (auto h : handles_) {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), h, nullptr, nullptr));
}
}
void Close() { void Close() {
for (auto h : handles_) { for (auto h : handles_) {
delete h; delete h;
@ -289,6 +312,109 @@ TEST_F(CheckpointTest, GetSnapshotLink) {
} }
} }
TEST_F(CheckpointTest, ExportColumnFamilyWithLinks) {
// Create a database
Status s;
auto options = CurrentOptions();
options.create_if_missing = true;
CreateAndReopenWithCF({}, options);
// Helper to verify the number of files in metadata and export dir
auto verify_files_exported = [&](const ExportImportFilesMetaData& metadata,
int num_files_expected) {
ASSERT_EQ(metadata.files.size(), num_files_expected);
std::vector<std::string> subchildren;
env_->GetChildren(export_path_, &subchildren);
int num_children = 0;
for (const auto& child : subchildren) {
if (child != "." && child != "..") {
++num_children;
}
}
ASSERT_EQ(num_children, num_files_expected);
};
// Test DefaultColumnFamily
{
const auto key = std::string("foo");
ASSERT_OK(Put(key, "v1"));
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
// Export the Tables and verify
ASSERT_OK(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(),
export_path_, &metadata_));
verify_files_exported(*metadata_, 1);
ASSERT_EQ(metadata_->db_comparator_name, options.comparator->Name());
test::DestroyDir(env_, export_path_);
delete metadata_;
metadata_ = nullptr;
// Check again after compaction
CompactAll();
ASSERT_OK(Put(key, "v2"));
ASSERT_OK(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(),
export_path_, &metadata_));
verify_files_exported(*metadata_, 2);
ASSERT_EQ(metadata_->db_comparator_name, options.comparator->Name());
test::DestroyDir(env_, export_path_);
delete metadata_;
metadata_ = nullptr;
delete checkpoint;
}
// Test non default column family with non default comparator
{
auto cf_options = CurrentOptions();
cf_options.comparator = ReverseBytewiseComparator();
ASSERT_OK(
db_->CreateColumnFamily(cf_options, "yoyo", &cfh_reverse_comp_));
const auto key = std::string("foo");
ASSERT_OK(db_->Put(WriteOptions(), cfh_reverse_comp_, key, "v1"));
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
// Export the Tables and verify
ASSERT_OK(checkpoint->ExportColumnFamily(cfh_reverse_comp_, export_path_,
&metadata_));
verify_files_exported(*metadata_, 1);
ASSERT_EQ(metadata_->db_comparator_name,
ReverseBytewiseComparator()->Name());
delete checkpoint;
}
}
TEST_F(CheckpointTest, ExportColumnFamilyNegativeTest) {
// Create a database
Status s;
auto options = CurrentOptions();
options.create_if_missing = true;
CreateAndReopenWithCF({}, options);
const auto key = std::string("foo");
ASSERT_OK(Put(key, "v1"));
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
// Export onto existing directory
env_->CreateDirIfMissing(export_path_);
ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(),
export_path_, &metadata_),
Status::InvalidArgument("Specified export_dir exists"));
test::DestroyDir(env_, export_path_);
// Export with invalid directory specification
export_path_ = "";
ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(),
export_path_, &metadata_),
Status::InvalidArgument("Specified export_dir invalid"));
delete checkpoint;
}
TEST_F(CheckpointTest, CheckpointCF) { TEST_F(CheckpointTest, CheckpointCF) {
Options options = CurrentOptions(); Options options = CurrentOptions();
CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options); CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options);

Loading…
Cancel
Save