// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include "db/db_impl_secondary.h" #include "db/db_iter.h" #include "db/merge_context.h" #include "monitoring/perf_context_imp.h" #include "util/auto_roll_logger.h" namespace rocksdb { #ifndef ROCKSDB_LITE DBImplSecondary::DBImplSecondary(const DBOptions& db_options, const std::string& dbname) : DBImpl(db_options, dbname) { ROCKS_LOG_INFO(immutable_db_options_.info_log, "Opening the db in secondary mode"); LogFlush(immutable_db_options_.info_log); } DBImplSecondary::~DBImplSecondary() {} Status DBImplSecondary::Recover( const std::vector& column_families, bool /*readonly*/, bool /*error_if_log_file_exist*/, bool /*error_if_data_exists_in_logs*/) { mutex_.AssertHeld(); Status s; s = static_cast(versions_.get()) ->Recover(column_families, &manifest_reader_, &manifest_reporter_, &manifest_reader_status_); if (!s.ok()) { return s; } if (immutable_db_options_.paranoid_checks && s.ok()) { s = CheckConsistency(); } // Initial max_total_in_memory_state_ before recovery logs. max_total_in_memory_state_ = 0; for (auto cfd : *versions_->GetColumnFamilySet()) { auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); max_total_in_memory_state_ += mutable_cf_options->write_buffer_size * mutable_cf_options->max_write_buffer_number; } if (s.ok()) { default_cf_handle_ = new ColumnFamilyHandleImpl( versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_); default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats(); single_column_family_mode_ = versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1; } // TODO: attempt to recover from WAL files. return s; } // Implementation of the DB interface Status DBImplSecondary::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) { return GetImpl(read_options, column_family, key, value); } Status DBImplSecondary::GetImpl(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* pinnable_val) { assert(pinnable_val != nullptr); PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); StopWatch sw(env_, stats_, DB_GET); PERF_TIMER_GUARD(get_snapshot_time); auto cfh = static_cast(column_family); ColumnFamilyData* cfd = cfh->cfd(); if (tracer_) { InstrumentedMutexLock lock(&trace_mutex_); if (tracer_) { tracer_->Get(column_family, key); } } // Acquire SuperVersion SuperVersion* super_version = GetAndRefSuperVersion(cfd); SequenceNumber snapshot = versions_->LastSequence(); MergeContext merge_context; SequenceNumber max_covering_tombstone_seq = 0; Status s; LookupKey lkey(key, snapshot); PERF_TIMER_STOP(get_snapshot_time); bool done = false; if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, &max_covering_tombstone_seq, read_options)) { done = true; pinnable_val->PinSelf(); RecordTick(stats_, MEMTABLE_HIT); } else if ((s.ok() || s.IsMergeInProgress()) && super_version->imm->Get( lkey, pinnable_val->GetSelf(), &s, &merge_context, &max_covering_tombstone_seq, read_options)) { done = true; pinnable_val->PinSelf(); RecordTick(stats_, MEMTABLE_HIT); } if (!done && !s.ok() && !s.IsMergeInProgress()) { ReturnAndCleanupSuperVersion(cfd, super_version); return s; } if (!done) { PERF_TIMER_GUARD(get_from_output_files_time); super_version->current->Get(read_options, lkey, pinnable_val, &s, &merge_context, &max_covering_tombstone_seq); RecordTick(stats_, MEMTABLE_MISS); } { PERF_TIMER_GUARD(get_post_process_time); ReturnAndCleanupSuperVersion(cfd, super_version); RecordTick(stats_, NUMBER_KEYS_READ); size_t size = pinnable_val->size(); RecordTick(stats_, BYTES_READ, size); RecordTimeToHistogram(stats_, BYTES_PER_READ, size); PERF_COUNTER_ADD(get_read_bytes, size); } return s; } Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options, ColumnFamilyHandle* column_family) { if (read_options.managed) { return NewErrorIterator( Status::NotSupported("Managed iterator is not supported anymore.")); } if (read_options.read_tier == kPersistedTier) { return NewErrorIterator(Status::NotSupported( "ReadTier::kPersistedData is not yet supported in iterators.")); } Iterator* result = nullptr; auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); ReadCallback* read_callback = nullptr; // No read callback provided. if (read_options.tailing) { return NewErrorIterator(Status::NotSupported( "tailing iterator not supported in secondary mode")); } else if (read_options.snapshot != nullptr) { // TODO (yanqin) support snapshot. return NewErrorIterator( Status::NotSupported("snapshot not supported in secondary mode")); } else { auto snapshot = versions_->LastSequence(); result = NewIteratorImpl(read_options, cfd, snapshot, read_callback); } return result; } ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl( const ReadOptions& read_options, ColumnFamilyData* cfd, SequenceNumber snapshot, ReadCallback* read_callback) { assert(nullptr != cfd); SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); auto db_iter = NewArenaWrappedDbIterator( env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options, snapshot, super_version->mutable_cf_options.max_sequential_skip_in_iterations, super_version->version_number, read_callback); auto internal_iter = NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(), db_iter->GetRangeDelAggregator(), snapshot); db_iter->SetIterUnderDBIter(internal_iter); return db_iter; } Status DBImplSecondary::NewIterators( const ReadOptions& read_options, const std::vector& column_families, std::vector* iterators) { if (read_options.managed) { return Status::NotSupported("Managed iterator is not supported anymore."); } if (read_options.read_tier == kPersistedTier) { return Status::NotSupported( "ReadTier::kPersistedData is not yet supported in iterators."); } ReadCallback* read_callback = nullptr; // No read callback provided. if (iterators == nullptr) { return Status::InvalidArgument("iterators not allowed to be nullptr"); } iterators->clear(); iterators->reserve(column_families.size()); if (read_options.tailing) { return Status::NotSupported( "tailing iterator not supported in secondary mode"); } else if (read_options.snapshot != nullptr) { // TODO (yanqin) support snapshot. return Status::NotSupported("snapshot not supported in secondary mode"); } else { SequenceNumber read_seq = versions_->LastSequence(); for (auto cfh : column_families) { ColumnFamilyData* cfd = static_cast(cfh)->cfd(); iterators->push_back( NewIteratorImpl(read_options, cfd, read_seq, read_callback)); } } return Status::OK(); } Status DBImplSecondary::TryCatchUpWithPrimary() { assert(versions_.get() != nullptr); assert(manifest_reader_.get() != nullptr); Status s; std::unordered_set cfds_changed; InstrumentedMutexLock lock_guard(&mutex_); s = static_cast(versions_.get()) ->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed); if (s.ok()) { SuperVersionContext sv_context(true /* create_superversion */); for (auto cfd : cfds_changed) { sv_context.NewSuperVersion(); cfd->InstallSuperVersion(&sv_context, &mutex_); } sv_context.Clean(); } return s; } Status DB::OpenAsSecondary(const Options& options, const std::string& dbname, const std::string& secondary_path, DB** dbptr) { *dbptr = nullptr; DBOptions db_options(options); ColumnFamilyOptions cf_options(options); std::vector column_families; column_families.emplace_back(kDefaultColumnFamilyName, cf_options); std::vector handles; Status s = DB::OpenAsSecondary(db_options, dbname, secondary_path, column_families, &handles, dbptr); if (s.ok()) { assert(handles.size() == 1); delete handles[0]; } return s; } Status DB::OpenAsSecondary( const DBOptions& db_options, const std::string& dbname, const std::string& secondary_path, const std::vector& column_families, std::vector* handles, DB** dbptr) { *dbptr = nullptr; if (db_options.max_open_files != -1) { // TODO (yanqin) maybe support max_open_files != -1 by creating hard links // on SST files so that db secondary can still have access to old SSTs // while primary instance may delete original. return Status::InvalidArgument("require max_open_files to be -1"); } DBOptions tmp_opts(db_options); if (nullptr == tmp_opts.info_log) { Env* env = tmp_opts.env; assert(env != nullptr); std::string secondary_abs_path; env->GetAbsolutePath(secondary_path, &secondary_abs_path); std::string fname = InfoLogFileName(secondary_path, secondary_abs_path, tmp_opts.db_log_dir); env->CreateDirIfMissing(secondary_path); if (tmp_opts.log_file_time_to_roll > 0 || tmp_opts.max_log_file_size > 0) { AutoRollLogger* result = new AutoRollLogger( env, secondary_path, tmp_opts.db_log_dir, tmp_opts.max_log_file_size, tmp_opts.log_file_time_to_roll, tmp_opts.info_log_level); Status s = result->GetStatus(); if (!s.ok()) { delete result; } else { tmp_opts.info_log.reset(result); } } if (nullptr == tmp_opts.info_log) { env->RenameFile( fname, OldInfoLogFileName(secondary_path, env->NowMicros(), secondary_abs_path, tmp_opts.db_log_dir)); Status s = env->NewLogger(fname, &(tmp_opts.info_log)); if (tmp_opts.info_log != nullptr) { tmp_opts.info_log->SetInfoLogLevel(tmp_opts.info_log_level); } } } assert(tmp_opts.info_log != nullptr); handles->clear(); DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname); impl->versions_.reset(new ReactiveVersionSet( dbname, &impl->immutable_db_options_, impl->env_options_, impl->table_cache_.get(), impl->write_buffer_manager_, &impl->write_controller_)); impl->column_family_memtables_.reset( new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet())); impl->mutex_.Lock(); Status s = impl->Recover(column_families, true, false, false); if (s.ok()) { for (auto cf : column_families) { auto cfd = impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); if (nullptr == cfd) { s = Status::InvalidArgument("Column family not found: ", cf.name); break; } handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); } } SuperVersionContext sv_context(true /* create_superversion */); if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { sv_context.NewSuperVersion(); cfd->InstallSuperVersion(&sv_context, &impl->mutex_); } } impl->mutex_.Unlock(); sv_context.Clean(); if (s.ok()) { *dbptr = impl; for (auto h : *handles) { impl->NewThreadStatusCfInfo( reinterpret_cast(h)->cfd()); } } else { for (auto h : *handles) { delete h; } handles->clear(); delete impl; } return s; } #else // !ROCKSDB_LITE Status DB::OpenAsSecondary(const Options& /*options*/, const std::string& /*name*/, const std::string& /*secondary_path*/, DB** /*dbptr*/) { return Status::NotSupported("Not supported in ROCKSDB_LITE."); } Status DB::OpenAsSecondary( const DBOptions& /*db_options*/, const std::string& /*dbname*/, const std::string& /*secondary_path*/, const std::vector& /*column_families*/, std::vector* /*handles*/, DB** /*dbptr*/) { return Status::NotSupported("Not supported in ROCKSDB_LITE."); } #endif // !ROCKSDB_LITE } // namespace rocksdb