Add internal compaction API for Secondary instance (#8171)

Summary:
Add compaction API for secondary instance, which compact the files to a secondary DB path without installing to the LSM tree.
The API will be used to remote compaction.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8171

Test Plan: `make check`

Reviewed By: ajkr

Differential Revision: D27694545

Pulled By: jay-zhuang

fbshipit-source-id: 8ff3ec1bffdb2e1becee994918850c8902caf731
main
Jay Zhuang 3 years ago committed by Facebook GitHub Bot
parent e85d8a6517
commit f0fca2b1d5
  1. 154
      db/compaction/compaction_job.cc
  2. 161
      db/compaction/compaction_job.h
  3. 8
      db/db_impl/db_impl.cc
  4. 28
      db/db_impl/db_impl.h
  5. 88
      db/db_impl/db_impl_secondary.cc
  6. 20
      db/db_impl/db_impl_secondary.h
  7. 200
      db/db_secondary_test.cc
  8. 4
      db/output_validator.h

@ -312,14 +312,19 @@ CompactionJob::CompactionJob(
const std::atomic<int>* manual_compaction_paused, const std::string& db_id,
const std::string& db_session_id, std::string full_history_ts_low,
BlobFileCompletionCallback* blob_callback)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
compaction_job_stats_(compaction_job_stats),
: compact_(new CompactionState(compaction)),
compaction_stats_(compaction->compaction_reason(), 1),
db_options_(db_options),
log_buffer_(log_buffer),
output_directory_(output_directory),
stats_(stats),
bottommost_level_(false),
write_hint_(Env::WLTH_NOT_SET),
job_id_(job_id),
compaction_job_stats_(compaction_job_stats),
dbname_(dbname),
db_id_(db_id),
db_session_id_(db_session_id),
db_options_(db_options),
file_options_(file_options),
env_(db_options.env),
io_tracer_(io_tracer),
@ -330,11 +335,8 @@ CompactionJob::CompactionJob(
shutting_down_(shutting_down),
manual_compaction_paused_(manual_compaction_paused),
preserve_deletes_seqnum_(preserve_deletes_seqnum),
log_buffer_(log_buffer),
db_directory_(db_directory),
output_directory_(output_directory),
blob_output_directory_(blob_output_directory),
stats_(stats),
db_mutex_(db_mutex),
db_error_handler_(db_error_handler),
existing_snapshots_(std::move(existing_snapshots)),
@ -342,10 +344,8 @@ CompactionJob::CompactionJob(
snapshot_checker_(snapshot_checker),
table_cache_(std::move(table_cache)),
event_logger_(event_logger),
bottommost_level_(false),
paranoid_file_checks_(paranoid_file_checks),
measure_io_stats_(measure_io_stats),
write_hint_(Env::WLTH_NOT_SET),
thread_pri_(thread_pri),
full_history_ts_low_(std::move(full_history_ts_low)),
blob_callback_(blob_callback) {
@ -1550,9 +1550,7 @@ Status CompactionJob::FinishCompactionOutputFile(
FileDescriptor output_fd;
uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
if (meta != nullptr) {
fname =
TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
meta->fd.GetNumber(), meta->fd.GetPathId());
fname = GetTableFileName(meta->fd.GetNumber());
output_fd = meta->fd;
oldest_blob_file_number = meta->oldest_blob_file_number;
} else {
@ -1672,9 +1670,7 @@ Status CompactionJob::OpenCompactionOutputFile(
assert(sub_compact->builder == nullptr);
// no need to lock because VersionSet::next_file_number_ is atomic
uint64_t file_number = versions_->NewFileNumber();
std::string fname =
TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
file_number, sub_compact->compaction->output_path_id());
std::string fname = GetTableFileName(file_number);
// Fire events.
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
#ifndef ROCKSDB_LITE
@ -1937,4 +1933,132 @@ void CompactionJob::LogCompaction() {
}
}
std::string CompactionJob::GetTableFileName(uint64_t file_number) {
return TableFileName(compact_->compaction->immutable_cf_options()->cf_paths,
file_number, compact_->compaction->output_path_id());
}
std::string CompactionServiceCompactionJob::GetTableFileName(
uint64_t file_number) {
return MakeTableFileName(output_path_, file_number);
}
CompactionServiceCompactionJob::CompactionServiceCompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const FileOptions& file_options, VersionSet* versions,
const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
FSDirectory* output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_id, const std::string& db_session_id,
const std::string& output_path,
const CompactionServiceInput& compaction_service_input,
CompactionServiceResult* compaction_service_result)
: CompactionJob(
job_id, compaction, db_options, file_options, versions, shutting_down,
0, log_buffer, nullptr, output_directory, nullptr, stats, db_mutex,
db_error_handler, existing_snapshots, kMaxSequenceNumber, nullptr,
table_cache, event_logger,
compaction->mutable_cf_options()->paranoid_file_checks,
compaction->mutable_cf_options()->report_bg_io_stats, dbname,
&(compaction_service_result->stats), Env::Priority::USER, io_tracer,
nullptr, db_id, db_session_id,
compaction->column_family_data()->GetFullHistoryTsLow()),
output_path_(output_path),
compaction_input_(compaction_service_input),
compaction_result_(compaction_service_result) {}
Status CompactionServiceCompactionJob::Run() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_RUN);
auto* c = compact_->compaction;
assert(c->column_family_data() != nullptr);
assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
compact_->compaction->level()) > 0);
write_hint_ =
c->column_family_data()->CalculateSSTWriteHint(c->output_level());
bottommost_level_ = c->bottommost_level();
compact_->sub_compact_states.emplace_back(c, compaction_input_.begin,
compaction_input_.end,
compaction_input_.approx_size);
log_buffer_->FlushBufferToLog();
LogCompaction();
const uint64_t start_micros = db_options_.clock->NowMicros();
// Pick the only sub-compaction we should have
assert(compact_->sub_compact_states.size() == 1);
SubcompactionState* sub_compact = compact_->sub_compact_states.data();
ProcessKeyValueCompaction(sub_compact);
compaction_stats_.micros = db_options_.clock->NowMicros() - start_micros;
compaction_stats_.cpu_micros = sub_compact->compaction_job_stats.cpu_micros;
RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros);
RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
compaction_stats_.cpu_micros);
Status status = sub_compact->status;
IOStatus io_s = sub_compact->io_status;
if (io_status_.ok()) {
io_status_ = io_s;
}
if (status.ok()) {
constexpr IODebugContext* dbg = nullptr;
if (output_directory_) {
io_s = output_directory_->Fsync(IOOptions(), dbg);
}
}
if (io_status_.ok()) {
io_status_ = io_s;
}
if (status.ok()) {
status = io_s;
}
if (status.ok()) {
// TODO: Add verify_table() and VerifyCompactionFileConsistency()
}
// Finish up all book-keeping to unify the subcompaction results
AggregateStatistics();
UpdateCompactionStats();
compaction_result_->bytes_written = IOSTATS(bytes_written);
compaction_result_->bytes_read = IOSTATS(bytes_read);
RecordCompactionIOStats();
LogFlush(db_options_.info_log);
compact_->status = status;
compact_->status.PermitUncheckedError();
// Build compaction result
compaction_result_->output_level = compact_->compaction->output_level();
compaction_result_->output_path = output_path_;
for (const auto& output_file : sub_compact->outputs) {
auto& meta = output_file.meta;
compaction_result_->output_files.emplace_back(
MakeTableFileName(meta.fd.GetNumber()), meta.fd.smallest_seqno,
meta.fd.largest_seqno, meta.smallest.Encode().ToString(),
meta.largest.Encode().ToString(), meta.oldest_ancester_time,
meta.file_creation_time, output_file.validator.GetHash(),
meta.marked_for_compaction);
}
compaction_result_->num_output_records = sub_compact->num_output_records;
compaction_result_->total_bytes = sub_compact->total_bytes;
return status;
}
void CompactionServiceCompactionJob::CleanupCompaction() {
CompactionJob::CleanupCompaction();
}
} // namespace ROCKSDB_NAMESPACE

@ -84,7 +84,7 @@ class CompactionJob {
std::string full_history_ts_low = "",
BlobFileCompletionCallback* blob_callback = nullptr);
~CompactionJob();
virtual ~CompactionJob();
// no copy/move
CompactionJob(CompactionJob&& job) = delete;
@ -107,11 +107,35 @@ class CompactionJob {
// Return the IO status
IOStatus io_status() const { return io_status_; }
private:
protected:
struct SubcompactionState;
// CompactionJob state
struct CompactionState;
void AggregateStatistics();
void UpdateCompactionStats();
void LogCompaction();
void RecordCompactionIOStats();
void CleanupCompaction();
// Call compaction filter. Then iterate through input and compact the
// kv-pairs
void ProcessKeyValueCompaction(SubcompactionState* sub_compact);
CompactionState* compact_;
InternalStats::CompactionStats compaction_stats_;
const ImmutableDBOptions& db_options_;
LogBuffer* log_buffer_;
FSDirectory* output_directory_;
Statistics* stats_;
// Is this compaction creating a file in the bottom most level?
bool bottommost_level_;
Env::WriteLifeTimeHint write_hint_;
IOStatus io_status_;
private:
// Generates a histogram representing potential divisions of key ranges from
// the input. It adds the starting and/or ending keys of certain input files
// to the working set and then finds the approximate size of data in between
@ -122,9 +146,6 @@ class CompactionJob {
// update the thread status for starting a compaction.
void ReportStartedCompaction(Compaction* compaction);
void AllocateCompactionOutputFileNumbers();
// Call compaction filter. Then iterate through input and compact the
// kv-pairs
void ProcessKeyValueCompaction(SubcompactionState* sub_compact);
Status FinishCompactionOutputFile(
const Status& input_status, SubcompactionState* sub_compact,
@ -132,33 +153,23 @@ class CompactionJob {
CompactionIterationStats* range_del_out_stats,
const Slice* next_table_min_key = nullptr);
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options);
void RecordCompactionIOStats();
Status OpenCompactionOutputFile(SubcompactionState* sub_compact);
void CleanupCompaction();
void UpdateCompactionJobStats(
const InternalStats::CompactionStats& stats) const;
void RecordDroppedKeys(const CompactionIterationStats& c_iter_stats,
CompactionJobStats* compaction_job_stats = nullptr);
void UpdateCompactionStats();
void UpdateCompactionInputStatsHelper(
int* num_files, uint64_t* bytes_read, int input_level);
void LogCompaction();
int job_id_;
// CompactionJob state
struct CompactionState;
CompactionState* compact_;
CompactionJobStats* compaction_job_stats_;
InternalStats::CompactionStats compaction_stats_;
// DBImpl state
const std::string& dbname_;
const std::string db_id_;
const std::string db_session_id_;
const ImmutableDBOptions& db_options_;
const FileOptions file_options_;
Env* env_;
@ -170,11 +181,8 @@ class CompactionJob {
const std::atomic<bool>* shutting_down_;
const std::atomic<int>* manual_compaction_paused_;
const SequenceNumber preserve_deletes_seqnum_;
LogBuffer* log_buffer_;
FSDirectory* db_directory_;
FSDirectory* output_directory_;
FSDirectory* blob_output_directory_;
Statistics* stats_;
InstrumentedMutex* db_mutex_;
ErrorHandler* db_error_handler_;
// If there were two snapshots with seq numbers s1 and
@ -194,19 +202,128 @@ class CompactionJob {
EventLogger* event_logger_;
// Is this compaction creating a file in the bottom most level?
bool bottommost_level_;
bool paranoid_file_checks_;
bool measure_io_stats_;
// Stores the Slices that designate the boundaries for each subcompaction
std::vector<Slice> boundaries_;
// Stores the approx size of keys covered in the range of each subcompaction
std::vector<uint64_t> sizes_;
Env::WriteLifeTimeHint write_hint_;
Env::Priority thread_pri_;
IOStatus io_status_;
std::string full_history_ts_low_;
BlobFileCompletionCallback* blob_callback_;
// Get table file name in where it's outputting to, which should also be in
// `output_directory_`.
virtual std::string GetTableFileName(uint64_t file_number);
};
// CompactionServiceInput is used the pass compaction information between two
// db instances. It contains the information needed to do a compaction. It
// doesn't contain the LSM tree information, which is passed though MANIFEST
// file.
struct CompactionServiceInput {
ColumnFamilyDescriptor column_family;
DBOptions db_options;
std::vector<SequenceNumber> snapshots;
// SST files for compaction, it should already be expended to include all the
// files needed for this compaction, for both input level files and output
// level files.
std::vector<std::string> input_files;
int output_level;
// information for subcompaction
Slice* begin = nullptr;
Slice* end = nullptr;
uint64_t approx_size = 0;
};
// CompactionServiceOutputFile is the metadata for the output SST file
struct CompactionServiceOutputFile {
std::string file_name;
SequenceNumber smallest_seqno;
SequenceNumber largest_seqno;
std::string smallest_internal_key;
std::string largest_internal_key;
uint64_t oldest_ancester_time;
uint64_t file_creation_time;
uint64_t paranoid_hash;
bool marked_for_compaction;
CompactionServiceOutputFile() = default;
CompactionServiceOutputFile(
const std::string& name, SequenceNumber smallest, SequenceNumber largest,
std::string _smallest_internal_key, std::string _largest_internal_key,
uint64_t _oldest_ancester_time, uint64_t _file_creation_time,
uint64_t _paranoid_hash, bool _marked_for_compaction)
: file_name(name),
smallest_seqno(smallest),
largest_seqno(largest),
smallest_internal_key(std::move(_smallest_internal_key)),
largest_internal_key(std::move(_largest_internal_key)),
oldest_ancester_time(_oldest_ancester_time),
file_creation_time(_file_creation_time),
paranoid_hash(_paranoid_hash),
marked_for_compaction(_marked_for_compaction) {}
};
// CompactionServiceResult contains the compaction result from a different db
// instance, with these information, the primary db instance with write
// permission is able to install the result to the DB.
struct CompactionServiceResult {
std::vector<CompactionServiceOutputFile> output_files;
int output_level;
// location of the output files
std::string output_path;
// some statistics about the compaction
uint64_t num_output_records;
uint64_t total_bytes;
uint64_t bytes_read;
uint64_t bytes_written;
CompactionJobStats stats;
};
// CompactionServiceCompactionJob is an read-only compaction job, it takes
// input information from `compaction_service_input` and put result information
// in `compaction_service_result`, the SST files are generated to `output_path`.
class CompactionServiceCompactionJob : private CompactionJob {
public:
CompactionServiceCompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const FileOptions& file_options, VersionSet* versions,
const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
FSDirectory* output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_id, const std::string& db_session_id,
const std::string& output_path,
const CompactionServiceInput& compaction_service_input,
CompactionServiceResult* compaction_service_result);
// Run the compaction in current thread and return the result
Status Run();
void CleanupCompaction();
IOStatus io_status() const { return CompactionJob::io_status(); }
private:
// Get table file name in output_path
std::string GetTableFileName(uint64_t file_number) override;
// Specific the compaction output path, otherwise it uses default DB path
const std::string output_path_;
// Compaction job input
const CompactionServiceInput& compaction_input_;
// Compaction job result
CompactionServiceResult* compaction_result_;
};
} // namespace ROCKSDB_NAMESPACE

@ -160,14 +160,17 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
mutex_(stats_, immutable_db_options_.clock, DB_MUTEX_WAIT_MICROS,
immutable_db_options_.use_adaptive_mutex),
default_cf_handle_(nullptr),
error_handler_(this, immutable_db_options_, &mutex_),
event_logger_(immutable_db_options_.info_log.get()),
max_total_in_memory_state_(0),
file_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
file_options_for_compaction_(fs_->OptimizeForCompactionTableWrite(
file_options_, immutable_db_options_)),
seq_per_batch_(seq_per_batch),
batch_per_txn_(batch_per_txn),
db_lock_(nullptr),
next_job_id_(1),
shutting_down_(false),
db_lock_(nullptr),
manual_compaction_paused_(false),
bg_cv_(&mutex_),
logfile_number_(0),
@ -194,7 +197,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
pending_purge_obsolete_files_(0),
delete_obsolete_files_last_run_(immutable_db_options_.clock->NowMicros()),
last_stats_dump_time_microsec_(0),
next_job_id_(1),
has_unpersisted_data_(false),
unable_to_release_oldest_log_(false),
num_running_ingest_file_(0),
@ -202,7 +204,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
wal_manager_(immutable_db_options_, file_options_, io_tracer_,
seq_per_batch),
#endif // ROCKSDB_LITE
event_logger_(immutable_db_options_.info_log.get()),
bg_work_paused_(0),
bg_compaction_paused_(0),
refitting_level_(false),
@ -231,7 +232,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
own_sfm_(options.sst_file_manager == nullptr),
preserve_deletes_(options.preserve_deletes),
closed_(false),
error_handler_(this, immutable_db_options_, &mutex_),
atomic_flush_install_cv_(&mutex_),
blob_callback_(immutable_db_options_.sst_file_manager.get(), &mutex_,
&error_handler_) {

@ -1130,6 +1130,14 @@ class DBImpl : public DB {
ColumnFamilyHandleImpl* default_cf_handle_;
InternalStats* default_cf_internal_stats_;
// table_cache_ provides its own synchronization
std::shared_ptr<Cache> table_cache_;
ErrorHandler error_handler_;
// Unified interface for logging events
EventLogger event_logger_;
// only used for dynamically adjusting max_total_wal_size. it is a sum of
// [write_buffer_size * max_write_buffer_number] over all column families
uint64_t max_total_in_memory_state_;
@ -1160,6 +1168,12 @@ class DBImpl : public DB {
// Default: true
const bool batch_per_txn_;
// Each flush or compaction gets its own job id. this counter makes sure
// they're unique
std::atomic<int> next_job_id_;
std::atomic<bool> shutting_down_;
// Except in DB::Open(), WriteOptionsFile can only be called when:
// Persist options to options file.
// If need_mutex_lock = false, the method will lock DB mutex.
@ -1940,9 +1954,6 @@ class DBImpl : public DB {
Status IncreaseFullHistoryTsLow(ColumnFamilyData* cfd, std::string ts_low);
// table_cache_ provides its own synchronization
std::shared_ptr<Cache> table_cache_;
// Lock over the persistent DB state. Non-nullptr iff successfully acquired.
FileLock* db_lock_;
@ -1956,8 +1967,6 @@ class DBImpl : public DB {
// mutex_, the order should be first mutex_ and then log_write_mutex_.
InstrumentedMutex log_write_mutex_;
std::atomic<bool> shutting_down_;
// If zero, manual compactions are allowed to proceed. If non-zero, manual
// compactions may still be running, but will quickly fail with
// `Status::Incomplete`. The value indicates how many threads have paused
@ -2166,10 +2175,6 @@ class DBImpl : public DB {
// Number of threads intending to write to memtable
std::atomic<size_t> pending_memtable_writes_ = {};
// Each flush or compaction gets its own job id. this counter makes sure
// they're unique
std::atomic<int> next_job_id_;
// A flag indicating whether the current rocksdb database has any
// data that is not yet persisted into either WAL or SST file.
// Used when disableWAL is true.
@ -2198,9 +2203,6 @@ class DBImpl : public DB {
WalManager wal_manager_;
#endif // ROCKSDB_LITE
// Unified interface for logging events
EventLogger event_logger_;
// A value of > 0 temporarily disables scheduling of background work
int bg_work_paused_;
@ -2268,8 +2270,6 @@ class DBImpl : public DB {
// Flag to check whether Close() has been called on this DB
bool closed_;
ErrorHandler error_handler_;
// Conditional variable to coordinate installation of atomic flush results.
// With atomic flush, each bg thread installs the result of flushing multiple
// column families, and different threads can flush different column

@ -17,8 +17,10 @@ namespace ROCKSDB_NAMESPACE {
#ifndef ROCKSDB_LITE
DBImplSecondary::DBImplSecondary(const DBOptions& db_options,
const std::string& dbname)
: DBImpl(db_options, dbname) {
const std::string& dbname,
std::string secondary_path)
: DBImpl(db_options, dbname, false, true, true),
secondary_path_(std::move(secondary_path)) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Opening the db in secondary mode");
LogFlush(immutable_db_options_.info_log);
@ -617,7 +619,7 @@ Status DB::OpenAsSecondary(
}
handles->clear();
DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname);
DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname, secondary_path);
impl->versions_.reset(new ReactiveVersionSet(
dbname, &impl->immutable_db_options_, impl->file_options_,
impl->table_cache_.get(), impl->write_buffer_manager_,
@ -663,6 +665,86 @@ Status DB::OpenAsSecondary(
}
return s;
}
Status DBImplSecondary::CompactWithoutInstallation(
ColumnFamilyHandle* cfh, const CompactionServiceInput& input,
CompactionServiceResult* result) {
InstrumentedMutexLock l(&mutex_);
auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
if (!cfd) {
return Status::InvalidArgument("Cannot find column family" +
cfh->GetName());
}
std::unordered_set<uint64_t> input_set;
for (const auto& file_name : input.input_files) {
input_set.insert(TableFileNameToNumber(file_name));
}
auto* version = cfd->current();
ColumnFamilyMetaData cf_meta;
version->GetColumnFamilyMetaData(&cf_meta);
const MutableCFOptions* mutable_cf_options = cfd->GetLatestMutableCFOptions();
ColumnFamilyOptions cf_options = cfd->GetLatestCFOptions();
VersionStorageInfo* vstorage = version->storage_info();
// Use comp_options to reuse some CompactFiles functions
CompactionOptions comp_options;
comp_options.compression = kDisableCompressionOption;
comp_options.output_file_size_limit = MaxFileSizeForLevel(
*mutable_cf_options, input.output_level, cf_options.compaction_style,
vstorage->base_level(), cf_options.level_compaction_dynamic_level_bytes);
std::vector<CompactionInputFiles> input_files;
Status s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage, comp_options);
if (!s.ok()) {
return s;
}
std::unique_ptr<Compaction> c;
assert(cfd->compaction_picker());
c.reset(cfd->compaction_picker()->CompactFiles(
comp_options, input_files, input.output_level, vstorage,
*mutable_cf_options, mutable_db_options_, 0));
assert(c != nullptr);
c->SetInputVersion(version);
// Create output directory if it's not existed yet
std::unique_ptr<FSDirectory> output_dir;
s = CreateAndNewDirectory(fs_.get(), secondary_path_, &output_dir);
if (!s.ok()) {
return s;
}
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
immutable_db_options_.info_log.get());
const int job_id = next_job_id_.fetch_add(1);
CompactionServiceCompactionJob compaction_job(
job_id, c.get(), immutable_db_options_, file_options_for_compaction_,
versions_.get(), &shutting_down_, &log_buffer, output_dir.get(), stats_,
&mutex_, &error_handler_, input.snapshots, table_cache_, &event_logger_,
dbname_, io_tracer_, db_id_, db_session_id_, secondary_path_, input,
result);
mutex_.Unlock();
s = compaction_job.Run();
mutex_.Lock();
// clean up
compaction_job.io_status().PermitUncheckedError();
compaction_job.CleanupCompaction();
c->ReleaseCompactionFiles(s);
c.reset();
return s;
}
#else // !ROCKSDB_LITE
Status DB::OpenAsSecondary(const Options& /*options*/,

@ -71,7 +71,8 @@ class LogReaderContainer {
// effort attempts to catch up with the primary.
class DBImplSecondary : public DBImpl {
public:
DBImplSecondary(const DBOptions& options, const std::string& dbname);
DBImplSecondary(const DBOptions& options, const std::string& dbname,
std::string secondary_path);
~DBImplSecondary() override;
// Recover by replaying MANIFEST and WAL. Also initialize manifest_reader_
@ -222,6 +223,14 @@ class DBImplSecondary : public DBImpl {
// not flag the missing file as inconsistency.
Status CheckConsistency() override;
#ifndef NDEBUG
Status TEST_CompactWithoutInstallation(ColumnFamilyHandle* cfh,
const CompactionServiceInput& input,
CompactionServiceResult* result) {
return CompactWithoutInstallation(cfh, input, result);
}
#endif // NDEBUG
protected:
// ColumnFamilyCollector is a write batch handler which does nothing
// except recording unique column family IDs
@ -316,6 +325,13 @@ class DBImplSecondary : public DBImpl {
std::unordered_set<ColumnFamilyData*>* cfds_changed,
JobContext* job_context);
// Run compaction without installation, the output files will be placed in the
// secondary DB path. The LSM tree won't be changed, the secondary DB is still
// in read-only mode.
Status CompactWithoutInstallation(ColumnFamilyHandle* cfh,
const CompactionServiceInput& input,
CompactionServiceResult* result);
std::unique_ptr<log::FragmentBufferedReader> manifest_reader_;
std::unique_ptr<log::Reader::Reporter> manifest_reporter_;
std::unique_ptr<Status> manifest_reader_status_;
@ -326,6 +342,8 @@ class DBImplSecondary : public DBImpl {
// Current WAL number replayed for each column family.
std::unordered_map<ColumnFamilyData*, uint64_t> cfd_to_current_log_;
const std::string secondary_path_;
};
} // namespace ROCKSDB_NAMESPACE

@ -147,6 +147,206 @@ TEST_F(DBSecondaryTest, ReopenAsSecondary) {
ASSERT_EQ(2, count);
}
TEST_F(DBSecondaryTest, SimpleInternalCompaction) {
Options options;
options.env = env_;
Reopen(options);
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
ASSERT_OK(Flush());
}
CompactionServiceInput input;
ColumnFamilyMetaData meta;
db_->GetColumnFamilyMetaData(&meta);
for (auto& file : meta.levels[0].files) {
ASSERT_EQ(0, meta.levels[0].level);
input.input_files.push_back(file.name);
}
ASSERT_EQ(input.input_files.size(), 3);
input.output_level = 1;
Close();
options.max_open_files = -1;
OpenSecondary(options);
auto cfh = db_secondary_->DefaultColumnFamily();
CompactionServiceResult result;
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input,
&result));
ASSERT_EQ(result.output_files.size(), 1);
InternalKey smallest, largest;
smallest.DecodeFrom(result.output_files[0].smallest_internal_key);
largest.DecodeFrom(result.output_files[0].largest_internal_key);
ASSERT_EQ(smallest.user_key().ToString(), "bar");
ASSERT_EQ(largest.user_key().ToString(), "foo");
ASSERT_EQ(result.output_level, 1);
ASSERT_EQ(result.output_path, this->secondary_path_);
ASSERT_EQ(result.num_output_records, 2);
ASSERT_GT(result.bytes_written, 0);
}
TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) {
Options options;
options.env = env_;
options.disable_auto_compactions = true;
Reopen(options);
const int kRangeL2 = 10;
const int kRangeL1 = 30;
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put(Key(i * kRangeL2), "value" + ToString(i)));
ASSERT_OK(Put(Key((i + 1) * kRangeL2 - 1), "value" + ToString(i)));
ASSERT_OK(Flush());
}
MoveFilesToLevel(2);
for (int i = 0; i < 5; i++) {
ASSERT_OK(Put(Key(i * kRangeL1), "value" + ToString(i)));
ASSERT_OK(Put(Key((i + 1) * kRangeL1 - 1), "value" + ToString(i)));
ASSERT_OK(Flush());
}
MoveFilesToLevel(1);
for (int i = 0; i < 4; i++) {
ASSERT_OK(Put(Key(i * 30), "value" + ToString(i)));
ASSERT_OK(Put(Key(i * 30 + 50), "value" + ToString(i)));
ASSERT_OK(Flush());
}
ColumnFamilyMetaData meta;
db_->GetColumnFamilyMetaData(&meta);
// pick 2 files on level 0 for compaction, which has 3 overlap files on L1
CompactionServiceInput input1;
input1.input_files.push_back(meta.levels[0].files[2].name);
input1.input_files.push_back(meta.levels[0].files[3].name);
input1.input_files.push_back(meta.levels[1].files[0].name);
input1.input_files.push_back(meta.levels[1].files[1].name);
input1.input_files.push_back(meta.levels[1].files[2].name);
input1.output_level = 1;
options.max_open_files = -1;
Close();
OpenSecondary(options);
auto cfh = db_secondary_->DefaultColumnFamily();
CompactionServiceResult result;
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input1,
&result));
// pick 2 files on level 1 for compaction, which has 6 overlap files on L2
CompactionServiceInput input2;
input2.input_files.push_back(meta.levels[1].files[1].name);
input2.input_files.push_back(meta.levels[1].files[2].name);
for (int i = 3; i < 9; i++) {
input2.input_files.push_back(meta.levels[2].files[i].name);
}
input2.output_level = 2;
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input2,
&result));
CloseSecondary();
// delete all l2 files, without update manifest
for (auto& file : meta.levels[2].files) {
ASSERT_OK(env_->DeleteFile(dbname_ + file.name));
}
OpenSecondary(options);
cfh = db_secondary_->DefaultColumnFamily();
Status s = db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input2,
&result);
ASSERT_TRUE(s.IsInvalidArgument());
// TODO: L0 -> L1 compaction should success, currently version is not built
// if files is missing.
// ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh,
// input1, &result));
}
TEST_F(DBSecondaryTest, InternalCompactionCompactedFiles) {
Options options;
options.env = env_;
options.level0_file_num_compaction_trigger = 4;
Reopen(options);
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
ASSERT_OK(Flush());
}
CompactionServiceInput input;
ColumnFamilyMetaData meta;
db_->GetColumnFamilyMetaData(&meta);
for (auto& file : meta.levels[0].files) {
ASSERT_EQ(0, meta.levels[0].level);
input.input_files.push_back(file.name);
}
ASSERT_EQ(input.input_files.size(), 3);
input.output_level = 1;
// trigger compaction to delete the files for secondary instance compaction
ASSERT_OK(Put("foo", "foo_value" + std::to_string(3)));
ASSERT_OK(Put("bar", "bar_value" + std::to_string(3)));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
Close();
options.max_open_files = -1;
OpenSecondary(options);
auto cfh = db_secondary_->DefaultColumnFamily();
CompactionServiceResult result;
Status s =
db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, &result);
ASSERT_TRUE(s.IsInvalidArgument());
}
TEST_F(DBSecondaryTest, InternalCompactionMissingFiles) {
Options options;
options.env = env_;
options.level0_file_num_compaction_trigger = 4;
Reopen(options);
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
ASSERT_OK(Flush());
}
CompactionServiceInput input;
ColumnFamilyMetaData meta;
db_->GetColumnFamilyMetaData(&meta);
for (auto& file : meta.levels[0].files) {
ASSERT_EQ(0, meta.levels[0].level);
input.input_files.push_back(file.name);
}
ASSERT_EQ(input.input_files.size(), 3);
input.output_level = 1;
Close();
ASSERT_OK(env_->DeleteFile(dbname_ + input.input_files[0]));
options.max_open_files = -1;
OpenSecondary(options);
auto cfh = db_secondary_->DefaultColumnFamily();
CompactionServiceResult result;
Status s =
db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, &result);
ASSERT_TRUE(s.IsInvalidArgument());
input.input_files.erase(input.input_files.begin());
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input,
&result));
}
TEST_F(DBSecondaryTest, OpenAsSecondary) {
Options options;
options.env = env_;

@ -33,11 +33,13 @@ class OutputValidator {
return GetHash() == other_validator.GetHash();
}
private:
// Not (yet) intended to be persisted, so subject to change
// without notice between releases.
uint64_t GetHash() const { return paranoid_hash_; }
void SetHash(uint64_t hash) { paranoid_hash_ = hash; }
private:
const InternalKeyComparator& icmp_;
std::string prev_key_;
uint64_t paranoid_hash_ = 0;

Loading…
Cancel
Save