diff --git a/CMakeLists.txt b/CMakeLists.txt index 9cabf3b38..132d3b04e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -489,6 +489,7 @@ set(SOURCES db/db_impl_debug.cc db/db_impl_experimental.cc db/db_impl_readonly.cc + db/db_impl_secondary.cc db/db_info_dumper.cc db/db_iter.cc db/dbformat.cc @@ -873,6 +874,7 @@ if(WITH_TESTS) db/db_options_test.cc db/db_properties_test.cc db/db_range_del_test.cc + db/db_secondary_test.cc db/db_sst_test.cc db/db_statistics_test.cc db/db_table_properties_test.cc diff --git a/HISTORY.md b/HISTORY.md index 6046bb1e7..f63bfb97b 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,24 +2,17 @@ ## Unreleased ### New Features * Introduce two more stats levels, kExceptHistogramOrTimers and kExceptTimers. +* Added a feature to perform data-block sampling for compressibility, and report stats to user. * Add support for trace filtering. - ### Public API Change * Remove bundled fbson library. * statistics.stats_level_ becomes atomic. It is preferred to use statistics.set_stats_level() and statistics.get_stats_level() to access it. - +* Introduce a new IOError subcode, PathNotFound, to indicate trying to open a nonexistent file or directory for read. +* Add initial support for multiple db instances sharing the same data in single-writer, multi-reader mode. ### Bug Fixes * Fix JEMALLOC_CXX_THROW macro missing from older Jemalloc versions, causing build failures on some platforms. * Fix SstFileReader not able to open file ingested with write_glbal_seqno=true. - -## Unreleased -### New Features -* Added a feature to perform data-block sampling for compressibility, and report stats to user. -### Public API Change -### Bug fixes - - ## 6.0.0 (2/19/2019) ### New Features * Enabled checkpoint on readonly db (DBImplReadOnly). diff --git a/Makefile b/Makefile index 7030eb48d..eee0f9fba 100644 --- a/Makefile +++ b/Makefile @@ -443,6 +443,7 @@ TESTS = \ db_merge_operator_test \ db_options_test \ db_range_del_test \ + db_secondary_test \ db_sst_test \ db_tailing_iter_test \ db_io_failure_test \ @@ -547,6 +548,7 @@ TESTS = \ range_tombstone_fragmenter_test \ range_del_aggregator_test \ sst_file_reader_test \ + db_secondary_test \ PARALLEL_TEST = \ backupable_db_test \ @@ -1571,6 +1573,9 @@ range_tombstone_fragmenter_test: db/range_tombstone_fragmenter_test.o db/db_test sst_file_reader_test: table/sst_file_reader_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_secondary_test: db/db_secondary_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + #------------------------------------------------- # make install related stuff INSTALL_PATH ?= /usr/local diff --git a/TARGETS b/TARGETS index 4590560f1..073c977e5 100644 --- a/TARGETS +++ b/TARGETS @@ -98,6 +98,7 @@ cpp_library( "db/db_impl_files.cc", "db/db_impl_open.cc", "db/db_impl_readonly.cc", + "db/db_impl_secondary.cc", "db/db_impl_write.cc", "db/db_info_dumper.cc", "db/db_iter.cc", @@ -605,6 +606,11 @@ ROCKS_TESTS = [ "db/db_range_del_test.cc", "serial", ], + [ + "db_secondary_test", + "db/db_secondary_test.cc", + "serial", + ], [ "db_sst_test", "db/db_sst_test.cc", diff --git a/db/db_impl.cc b/db/db_impl.cc index 3d819e66a..f50900e55 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -148,18 +148,21 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, immutable_db_options_(initial_db_options_), mutable_db_options_(initial_db_options_), stats_(immutable_db_options_.statistics.get()), - db_lock_(nullptr), mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS, immutable_db_options_.use_adaptive_mutex), + default_cf_handle_(nullptr), + max_total_in_memory_state_(0), + env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), + env_options_for_compaction_(env_->OptimizeForCompactionTableWrite( + env_options_, immutable_db_options_)), + db_lock_(nullptr), shutting_down_(false), bg_cv_(&mutex_), logfile_number_(0), log_dir_synced_(false), log_empty_(true), - default_cf_handle_(nullptr), log_sync_cv_(&mutex_), total_log_size_(0), - max_total_in_memory_state_(0), is_snapshot_supported_(true), write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()), write_thread_(immutable_db_options_), @@ -186,9 +189,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, next_job_id_(1), has_unpersisted_data_(false), unable_to_release_oldest_log_(false), - env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), - env_options_for_compaction_(env_->OptimizeForCompactionTableWrite( - env_options_, immutable_db_options_)), num_running_ingest_file_(0), #ifndef ROCKSDB_LITE wal_manager_(immutable_db_options_, env_options_, seq_per_batch), diff --git a/db/db_impl.h b/db/db_impl.h index 9b5e6dfb8..a7419c92e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -758,6 +758,29 @@ class DBImpl : public DB { std::unique_ptr tracer_; InstrumentedMutex trace_mutex_; + // State below is protected by mutex_ + // With two_write_queues enabled, some of the variables that accessed during + // WriteToWAL need different synchronization: log_empty_, alive_log_files_, + // logs_, logfile_number_. Refer to the definition of each variable below for + // more description. + mutable InstrumentedMutex mutex_; + + ColumnFamilyHandleImpl* default_cf_handle_; + InternalStats* default_cf_internal_stats_; + + // only used for dynamically adjusting max_total_wal_size. it is a sum of + // [write_buffer_size * max_write_buffer_number] over all column families + uint64_t max_total_in_memory_state_; + // If true, we have only one (default) column family. We use this to optimize + // some code-paths + bool single_column_family_mode_; + + // The options to access storage files + const EnvOptions env_options_; + + // Additonal options for compaction and flush + EnvOptions env_options_for_compaction_; + // Except in DB::Open(), WriteOptionsFile can only be called when: // Persist options to options file. // If need_mutex_lock = false, the method will lock DB mutex. @@ -845,6 +868,14 @@ class DBImpl : public DB { // Actual implementation of Close() Status CloseImpl(); + // Recover the descriptor from persistent storage. May do a significant + // amount of work to recover recently logged updates. Any changes to + // be made to the descriptor are added to *edit. + virtual Status Recover( + const std::vector& column_families, + bool read_only = false, bool error_if_log_file_exist = false, + bool error_if_data_exists_in_logs = false); + private: friend class DB; friend class ErrorHandler; @@ -893,13 +924,6 @@ class DBImpl : public DB { struct PrepickedCompaction; struct PurgeFileInfo; - // Recover the descriptor from persistent storage. May do a significant - // amount of work to recover recently logged updates. Any changes to - // be made to the descriptor are added to *edit. - Status Recover(const std::vector& column_families, - bool read_only = false, bool error_if_log_file_exist = false, - bool error_if_data_exists_in_logs = false); - Status ResumeImpl(); void MaybeIgnoreError(Status* s) const; @@ -1216,12 +1240,6 @@ class DBImpl : public DB { // and log_empty_. Refer to the definition of each variable below for more // details. InstrumentedMutex log_write_mutex_; - // State below is protected by mutex_ - // With two_write_queues enabled, some of the variables that accessed during - // WriteToWAL need different synchronization: log_empty_, alive_log_files_, - // logs_, logfile_number_. Refer to the definition of each variable below for - // more description. - mutable InstrumentedMutex mutex_; std::atomic shutting_down_; // This condition variable is signaled on these conditions: @@ -1253,8 +1271,7 @@ class DBImpl : public DB { // read and writes are protected by log_write_mutex_ instead. This is to avoid // expesnive mutex_ lock during WAL write, which update log_empty_. bool log_empty_; - ColumnFamilyHandleImpl* default_cf_handle_; - InternalStats* default_cf_internal_stats_; + std::unique_ptr column_family_memtables_; struct LogFileNumberSize { explicit LogFileNumberSize(uint64_t _number) @@ -1321,12 +1338,7 @@ class DBImpl : public DB { WriteBatch cached_recoverable_state_; std::atomic cached_recoverable_state_empty_ = {true}; std::atomic total_log_size_; - // only used for dynamically adjusting max_total_wal_size. it is a sum of - // [write_buffer_size * max_write_buffer_number] over all column families - uint64_t max_total_in_memory_state_; - // If true, we have only one (default) column family. We use this to optimize - // some code-paths - bool single_column_family_mode_; + // If this is non-empty, we need to delete these log files in background // threads. Protected by db mutex. autovector logs_to_free_; @@ -1545,12 +1557,6 @@ class DBImpl : public DB { std::string db_absolute_path_; - // The options to access storage files - const EnvOptions env_options_; - - // Additonal options for compaction and flush - EnvOptions env_options_for_compaction_; - // Number of running IngestExternalFile() calls. // REQUIRES: mutex held int num_running_ingest_file_; diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 52ee53748..99c27f45d 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -629,8 +629,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // to be skipped instead of propagating bad information (like overly // large sequence numbers). log::Reader reader(immutable_db_options_.info_log, std::move(file_reader), - &reporter, true /*checksum*/, log_number, - false /* retry_after_eof */); + &reporter, true /*checksum*/, log_number); // Determine if we should tolerate incomplete records at the tail end of the // Read all the records and add to a memtable diff --git a/db/db_impl_secondary.cc b/db/db_impl_secondary.cc new file mode 100644 index 000000000..acc952524 --- /dev/null +++ b/db/db_impl_secondary.cc @@ -0,0 +1,356 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_impl_secondary.h" +#include "db/db_iter.h" +#include "db/merge_context.h" +#include "monitoring/perf_context_imp.h" +#include "util/auto_roll_logger.h" + +namespace rocksdb { + +#ifndef ROCKSDB_LITE + +DBImplSecondary::DBImplSecondary(const DBOptions& db_options, + const std::string& dbname) + : DBImpl(db_options, dbname) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Opening the db in secondary mode"); + LogFlush(immutable_db_options_.info_log); +} + +DBImplSecondary::~DBImplSecondary() {} + +Status DBImplSecondary::Recover( + const std::vector& column_families, + bool /*readonly*/, bool /*error_if_log_file_exist*/, + bool /*error_if_data_exists_in_logs*/) { + mutex_.AssertHeld(); + + Status s; + s = static_cast(versions_.get()) + ->Recover(column_families, &manifest_reader_, &manifest_reporter_, + &manifest_reader_status_); + if (!s.ok()) { + return s; + } + if (immutable_db_options_.paranoid_checks && s.ok()) { + s = CheckConsistency(); + } + // Initial max_total_in_memory_state_ before recovery logs. + max_total_in_memory_state_ = 0; + for (auto cfd : *versions_->GetColumnFamilySet()) { + auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); + max_total_in_memory_state_ += mutable_cf_options->write_buffer_size * + mutable_cf_options->max_write_buffer_number; + } + if (s.ok()) { + default_cf_handle_ = new ColumnFamilyHandleImpl( + versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_); + default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats(); + single_column_family_mode_ = + versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1; + } + + // TODO: attempt to recover from WAL files. + return s; +} + +// Implementation of the DB interface +Status DBImplSecondary::Get(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value) { + return GetImpl(read_options, column_family, key, value); +} + +Status DBImplSecondary::GetImpl(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* pinnable_val) { + assert(pinnable_val != nullptr); + PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); + StopWatch sw(env_, stats_, DB_GET); + PERF_TIMER_GUARD(get_snapshot_time); + + auto cfh = static_cast(column_family); + ColumnFamilyData* cfd = cfh->cfd(); + if (tracer_) { + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_) { + tracer_->Get(column_family, key); + } + } + // Acquire SuperVersion + SuperVersion* super_version = GetAndRefSuperVersion(cfd); + SequenceNumber snapshot = versions_->LastSequence(); + MergeContext merge_context; + SequenceNumber max_covering_tombstone_seq = 0; + Status s; + LookupKey lkey(key, snapshot); + PERF_TIMER_STOP(get_snapshot_time); + + bool done = false; + if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, + &max_covering_tombstone_seq, read_options)) { + done = true; + pinnable_val->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } else if ((s.ok() || s.IsMergeInProgress()) && + super_version->imm->Get( + lkey, pinnable_val->GetSelf(), &s, &merge_context, + &max_covering_tombstone_seq, read_options)) { + done = true; + pinnable_val->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } + if (!done && !s.ok() && !s.IsMergeInProgress()) { + ReturnAndCleanupSuperVersion(cfd, super_version); + return s; + } + if (!done) { + PERF_TIMER_GUARD(get_from_output_files_time); + super_version->current->Get(read_options, lkey, pinnable_val, &s, + &merge_context, &max_covering_tombstone_seq); + RecordTick(stats_, MEMTABLE_MISS); + } + { + PERF_TIMER_GUARD(get_post_process_time); + ReturnAndCleanupSuperVersion(cfd, super_version); + RecordTick(stats_, NUMBER_KEYS_READ); + size_t size = pinnable_val->size(); + RecordTick(stats_, BYTES_READ, size); + RecordTimeToHistogram(stats_, BYTES_PER_READ, size); + PERF_COUNTER_ADD(get_read_bytes, size); + } + return s; +} + +Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options, + ColumnFamilyHandle* column_family) { + if (read_options.managed) { + return NewErrorIterator( + Status::NotSupported("Managed iterator is not supported anymore.")); + } + if (read_options.read_tier == kPersistedTier) { + return NewErrorIterator(Status::NotSupported( + "ReadTier::kPersistedData is not yet supported in iterators.")); + } + Iterator* result = nullptr; + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); + ReadCallback* read_callback = nullptr; // No read callback provided. + if (read_options.tailing) { + return NewErrorIterator(Status::NotSupported( + "tailing iterator not supported in secondary mode")); + } else if (read_options.snapshot != nullptr) { + // TODO (yanqin) support snapshot. + return NewErrorIterator( + Status::NotSupported("snapshot not supported in secondary mode")); + } else { + auto snapshot = versions_->LastSequence(); + result = NewIteratorImpl(read_options, cfd, snapshot, read_callback); + } + return result; +} + +ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl( + const ReadOptions& read_options, ColumnFamilyData* cfd, + SequenceNumber snapshot, ReadCallback* read_callback) { + assert(nullptr != cfd); + SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); + auto db_iter = NewArenaWrappedDbIterator( + env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options, + snapshot, + super_version->mutable_cf_options.max_sequential_skip_in_iterations, + super_version->version_number, read_callback); + auto internal_iter = + NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(), + db_iter->GetRangeDelAggregator(), snapshot); + db_iter->SetIterUnderDBIter(internal_iter); + return db_iter; +} + +Status DBImplSecondary::NewIterators( + const ReadOptions& read_options, + const std::vector& column_families, + std::vector* iterators) { + if (read_options.managed) { + return Status::NotSupported("Managed iterator is not supported anymore."); + } + if (read_options.read_tier == kPersistedTier) { + return Status::NotSupported( + "ReadTier::kPersistedData is not yet supported in iterators."); + } + ReadCallback* read_callback = nullptr; // No read callback provided. + if (iterators == nullptr) { + return Status::InvalidArgument("iterators not allowed to be nullptr"); + } + iterators->clear(); + iterators->reserve(column_families.size()); + if (read_options.tailing) { + return Status::NotSupported( + "tailing iterator not supported in secondary mode"); + } else if (read_options.snapshot != nullptr) { + // TODO (yanqin) support snapshot. + return Status::NotSupported("snapshot not supported in secondary mode"); + } else { + SequenceNumber read_seq = versions_->LastSequence(); + for (auto cfh : column_families) { + ColumnFamilyData* cfd = static_cast(cfh)->cfd(); + iterators->push_back( + NewIteratorImpl(read_options, cfd, read_seq, read_callback)); + } + } + return Status::OK(); +} + +Status DBImplSecondary::TryCatchUpWithPrimary() { + assert(versions_.get() != nullptr); + assert(manifest_reader_.get() != nullptr); + Status s; + std::unordered_set cfds_changed; + InstrumentedMutexLock lock_guard(&mutex_); + s = static_cast(versions_.get()) + ->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed); + if (s.ok()) { + SuperVersionContext sv_context(true /* create_superversion */); + for (auto cfd : cfds_changed) { + sv_context.NewSuperVersion(); + cfd->InstallSuperVersion(&sv_context, &mutex_); + } + sv_context.Clean(); + } + return s; +} + +Status DB::OpenAsSecondary(const Options& options, const std::string& dbname, + const std::string& secondary_path, DB** dbptr) { + *dbptr = nullptr; + + DBOptions db_options(options); + ColumnFamilyOptions cf_options(options); + std::vector column_families; + column_families.emplace_back(kDefaultColumnFamilyName, cf_options); + std::vector handles; + + Status s = DB::OpenAsSecondary(db_options, dbname, secondary_path, + column_families, &handles, dbptr); + if (s.ok()) { + assert(handles.size() == 1); + delete handles[0]; + } + return s; +} + +Status DB::OpenAsSecondary( + const DBOptions& db_options, const std::string& dbname, + const std::string& secondary_path, + const std::vector& column_families, + std::vector* handles, DB** dbptr) { + *dbptr = nullptr; + if (db_options.max_open_files != -1) { + // TODO (yanqin) maybe support max_open_files != -1 by creating hard links + // on SST files so that db secondary can still have access to old SSTs + // while primary instance may delete original. + return Status::InvalidArgument("require max_open_files to be -1"); + } + + DBOptions tmp_opts(db_options); + if (nullptr == tmp_opts.info_log) { + Env* env = tmp_opts.env; + assert(env != nullptr); + std::string secondary_abs_path; + env->GetAbsolutePath(secondary_path, &secondary_abs_path); + std::string fname = InfoLogFileName(secondary_path, secondary_abs_path, + tmp_opts.db_log_dir); + + env->CreateDirIfMissing(secondary_path); + if (tmp_opts.log_file_time_to_roll > 0 || tmp_opts.max_log_file_size > 0) { + AutoRollLogger* result = new AutoRollLogger( + env, secondary_path, tmp_opts.db_log_dir, tmp_opts.max_log_file_size, + tmp_opts.log_file_time_to_roll, tmp_opts.info_log_level); + Status s = result->GetStatus(); + if (!s.ok()) { + delete result; + } else { + tmp_opts.info_log.reset(result); + } + } + if (nullptr == tmp_opts.info_log) { + env->RenameFile( + fname, OldInfoLogFileName(secondary_path, env->NowMicros(), + secondary_abs_path, tmp_opts.db_log_dir)); + Status s = env->NewLogger(fname, &(tmp_opts.info_log)); + if (tmp_opts.info_log != nullptr) { + tmp_opts.info_log->SetInfoLogLevel(tmp_opts.info_log_level); + } + } + } + + assert(tmp_opts.info_log != nullptr); + + handles->clear(); + DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname); + impl->versions_.reset(new ReactiveVersionSet( + dbname, &impl->immutable_db_options_, impl->env_options_, + impl->table_cache_.get(), impl->write_buffer_manager_, + &impl->write_controller_)); + impl->column_family_memtables_.reset( + new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet())); + impl->mutex_.Lock(); + Status s = impl->Recover(column_families, true, false, false); + if (s.ok()) { + for (auto cf : column_families) { + auto cfd = + impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); + if (nullptr == cfd) { + s = Status::InvalidArgument("Column family not found: ", cf.name); + break; + } + handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); + } + } + SuperVersionContext sv_context(true /* create_superversion */); + if (s.ok()) { + for (auto cfd : *impl->versions_->GetColumnFamilySet()) { + sv_context.NewSuperVersion(); + cfd->InstallSuperVersion(&sv_context, &impl->mutex_); + } + } + impl->mutex_.Unlock(); + sv_context.Clean(); + if (s.ok()) { + *dbptr = impl; + for (auto h : *handles) { + impl->NewThreadStatusCfInfo( + reinterpret_cast(h)->cfd()); + } + } else { + for (auto h : *handles) { + delete h; + } + handles->clear(); + delete impl; + } + return s; +} +#else // !ROCKSDB_LITE + +Status DB::OpenAsSecondary(const Options& /*options*/, + const std::string& /*name*/, + const std::string& /*secondary_path*/, + DB** /*dbptr*/) { + return Status::NotSupported("Not supported in ROCKSDB_LITE."); +} + +Status DB::OpenAsSecondary( + const DBOptions& /*db_options*/, const std::string& /*dbname*/, + const std::string& /*secondary_path*/, + const std::vector& /*column_families*/, + std::vector* /*handles*/, DB** /*dbptr*/) { + return Status::NotSupported("Not supported in ROCKSDB_LITE."); +} +#endif // !ROCKSDB_LITE + +} // namespace rocksdb diff --git a/db/db_impl_secondary.h b/db/db_impl_secondary.h new file mode 100644 index 000000000..1b6746f7e --- /dev/null +++ b/db/db_impl_secondary.h @@ -0,0 +1,151 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include "db/db_impl.h" + +namespace rocksdb { + +class DBImplSecondary : public DBImpl { + public: + DBImplSecondary(const DBOptions& options, const std::string& dbname); + ~DBImplSecondary() override; + + Status Recover(const std::vector& column_families, + bool read_only, bool error_if_log_file_exist, + bool error_if_data_exists_in_logs) override; + + // Implementations of the DB interface + using DB::Get; + Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value) override; + + Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value); + + using DBImpl::NewIterator; + Iterator* NewIterator(const ReadOptions&, + ColumnFamilyHandle* column_family) override; + + ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& read_options, + ColumnFamilyData* cfd, + SequenceNumber snapshot, + ReadCallback* read_callback); + + Status NewIterators(const ReadOptions& options, + const std::vector& column_families, + std::vector* iterators) override; + + using DBImpl::Put; + Status Put(const WriteOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/, + const Slice& /*value*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + using DBImpl::Merge; + Status Merge(const WriteOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/, + const Slice& /*value*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + using DBImpl::Delete; + Status Delete(const WriteOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const Slice& /*key*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + using DBImpl::SingleDelete; + Status SingleDelete(const WriteOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const Slice& /*key*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + Status Write(const WriteOptions& /*options*/, + WriteBatch* /*updates*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + using DBImpl::CompactRange; + Status CompactRange(const CompactRangeOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const Slice* /*begin*/, const Slice* /*end*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + using DBImpl::CompactFiles; + Status CompactFiles( + const CompactionOptions& /*compact_options*/, + ColumnFamilyHandle* /*column_family*/, + const std::vector& /*input_file_names*/, + const int /*output_level*/, const int /*output_path_id*/ = -1, + std::vector* const /*output_file_names*/ = nullptr, + CompactionJobInfo* /*compaction_job_info*/ = nullptr) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + Status DisableFileDeletions() override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + Status EnableFileDeletions(bool /*force*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + Status GetLiveFiles(std::vector&, + uint64_t* /*manifest_file_size*/, + bool /*flush_memtable*/ = true) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + using DBImpl::Flush; + Status Flush(const FlushOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + using DBImpl::SyncWAL; + Status SyncWAL() override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + using DB::IngestExternalFile; + Status IngestExternalFile( + ColumnFamilyHandle* /*column_family*/, + const std::vector& /*external_files*/, + const IngestExternalFileOptions& /*ingestion_options*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + // Try to catch up with the primary by reading as much as possible from the + // log files until there is nothing more to read or encounters an error. If + // the amount of information in the log files to process is huge, this + // method can take long time due to all the I/O and CPU costs. + Status TryCatchUpWithPrimary() override; + + private: + friend class DB; + + // No copying allowed + DBImplSecondary(const DBImplSecondary&); + void operator=(const DBImplSecondary&); + + using DBImpl::Recover; + + std::unique_ptr manifest_reader_; + std::unique_ptr manifest_reporter_; + std::unique_ptr manifest_reader_status_; +}; +} // namespace rocksdb + +#endif // !ROCKSDB_LITE diff --git a/db/db_secondary_test.cc b/db/db_secondary_test.cc new file mode 100644 index 000000000..478a7cec9 --- /dev/null +++ b/db/db_secondary_test.cc @@ -0,0 +1,480 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/db_impl_secondary.h" +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "util/fault_injection_test_env.h" +#include "util/sync_point.h" + +namespace rocksdb { + +#ifndef ROCKSDB_LITE +class DBSecondaryTest : public DBTestBase { + public: + DBSecondaryTest() + : DBTestBase("/db_secondary_test"), + secondary_path_(), + handles_secondary_(), + db_secondary_(nullptr) { + secondary_path_ = + test::PerThreadDBPath(env_, "/db_secondary_test_secondary"); + } + + ~DBSecondaryTest() override { + CloseSecondary(); + if (getenv("KEEP_DB") != nullptr) { + fprintf(stdout, "Secondary DB is still at %s\n", secondary_path_.c_str()); + } else { + Options options; + options.env = env_; + EXPECT_OK(DestroyDB(secondary_path_, options)); + } + } + + protected: + Status ReopenAsSecondary(const Options& options) { + return DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_); + } + + void OpenSecondary(const Options& options); + + void OpenSecondaryWithColumnFamilies( + const std::vector& column_families, const Options& options); + + void CloseSecondary() { + for (auto h : handles_secondary_) { + db_secondary_->DestroyColumnFamilyHandle(h); + } + handles_secondary_.clear(); + delete db_secondary_; + db_secondary_ = nullptr; + } + + DBImplSecondary* db_secondary_full() { + return static_cast(db_secondary_); + } + + void CheckFileTypeCounts(const std::string& dir, int expected_log, + int expected_sst, int expected_manifest) const; + + std::string secondary_path_; + std::vector handles_secondary_; + DB* db_secondary_; +}; + +void DBSecondaryTest::OpenSecondary(const Options& options) { + Status s = + DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_); + ASSERT_OK(s); +} + +void DBSecondaryTest::OpenSecondaryWithColumnFamilies( + const std::vector& column_families, const Options& options) { + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + for (const auto& cf_name : column_families) { + cf_descs.emplace_back(cf_name, options); + } + Status s = DB::OpenAsSecondary(options, dbname_, secondary_path_, cf_descs, + &handles_secondary_, &db_secondary_); + ASSERT_OK(s); +} + +void DBSecondaryTest::CheckFileTypeCounts(const std::string& dir, + int expected_log, int expected_sst, + int expected_manifest) const { + std::vector filenames; + env_->GetChildren(dir, &filenames); + + int log_cnt = 0, sst_cnt = 0, manifest_cnt = 0; + for (auto file : filenames) { + uint64_t number; + FileType type; + if (ParseFileName(file, &number, &type)) { + log_cnt += (type == kLogFile); + sst_cnt += (type == kTableFile); + manifest_cnt += (type == kDescriptorFile); + } + } + ASSERT_EQ(expected_log, log_cnt); + ASSERT_EQ(expected_sst, sst_cnt); + ASSERT_EQ(expected_manifest, manifest_cnt); +} + +TEST_F(DBSecondaryTest, ReopenAsSecondary) { + Options options; + options.env = env_; + Reopen(options); + ASSERT_OK(Put("foo", "foo_value")); + ASSERT_OK(Put("bar", "bar_value")); + ASSERT_OK(dbfull()->Flush(FlushOptions())); + Close(); + + ASSERT_OK(ReopenAsSecondary(options)); + ASSERT_EQ("foo_value", Get("foo")); + ASSERT_EQ("bar_value", Get("bar")); + ReadOptions ropts; + ropts.verify_checksums = true; + auto db1 = static_cast(db_); + ASSERT_NE(nullptr, db1); + Iterator* iter = db1->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + if (0 == count) { + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ("bar_value", iter->value().ToString()); + } else if (1 == count) { + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ("foo_value", iter->value().ToString()); + } + ++count; + } + delete iter; + ASSERT_EQ(2, count); +} + +TEST_F(DBSecondaryTest, OpenAsSecondary) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + ASSERT_OK(Flush()); + } + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ReadOptions ropts; + ropts.verify_checksums = true; + const auto verify_db_func = [&](const std::string& foo_val, + const std::string& bar_val) { + std::string value; + ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); + ASSERT_EQ(foo_val, value); + ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); + ASSERT_EQ(bar_val, value); + Iterator* iter = db_secondary_->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + iter->Seek("foo"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ(foo_val, iter->value().ToString()); + iter->Seek("bar"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ(bar_val, iter->value().ToString()); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ++count; + } + ASSERT_EQ(2, count); + delete iter; + }; + + verify_db_func("foo_value2", "bar_value2"); + + ASSERT_OK(Put("foo", "new_foo_value")); + ASSERT_OK(Put("bar", "new_bar_value")); + ASSERT_OK(Flush()); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db_func("new_foo_value", "new_bar_value"); +} + +TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) { + Options options; + options.env = env_; + CreateAndReopenWithCF({"pikachu"}, options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options1); + cf_descs.emplace_back("pikachu", options1); + cf_descs.emplace_back("eevee", options1); + Status s = DB::OpenAsSecondary(options1, dbname_, secondary_path_, cf_descs, + &handles_secondary_, &db_secondary_); + ASSERT_NOK(s); +} + +TEST_F(DBSecondaryTest, OpenWithSubsetOfColumnFamilies) { + Options options; + options.env = env_; + CreateAndReopenWithCF({"pikachu"}, options); + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + ASSERT_EQ(0, handles_secondary_.size()); + ASSERT_NE(nullptr, db_secondary_); + + ASSERT_OK(Put(0 /*cf*/, "foo", "foo_value")); + ASSERT_OK(Put(1 /*cf*/, "foo", "foo_value")); + ASSERT_OK(Flush(0 /*cf*/)); + ASSERT_OK(Flush(1 /*cf*/)); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + ReadOptions ropts; + ropts.verify_checksums = true; + std::string value; + ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); + ASSERT_EQ("foo_value", value); +} + +TEST_F(DBSecondaryTest, SwitchToNewManifestDuringOpen) { + Options options; + options.env = env_; + Reopen(options); + Close(); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->LoadDependency( + {{"ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:0", + "VersionSet::ProcessManifestWrites:BeforeNewManifest"}, + {"VersionSet::ProcessManifestWrites:AfterNewManifest", + "ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:" + "1"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + // Make sure db calls RecoverLogFiles so as to trigger a manifest write, + // which causes the db to switch to a new MANIFEST upon start. + port::Thread ro_db_thread([&]() { + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + CloseSecondary(); + }); + Reopen(options); + ro_db_thread.join(); +} + +TEST_F(DBSecondaryTest, MissingTableFileDuringOpen) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + ASSERT_OK(dbfull()->Flush(FlushOptions())); + } + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + ReadOptions ropts; + ropts.verify_checksums = true; + std::string value; + ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); + ASSERT_EQ("foo_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + value); + ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); + ASSERT_EQ("bar_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + value); + Iterator* iter = db_secondary_->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + iter->Seek("bar"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ("bar_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + iter->value().ToString()); + iter->Seek("foo"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ("foo_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + iter->value().ToString()); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ++count; + } + ASSERT_EQ(2, count); + delete iter; +} + +TEST_F(DBSecondaryTest, MissingTableFile) { + int table_files_not_exist = 0; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "ReactiveVersionSet::ReadAndApply:AfterLoadTableHandlers", + [&](void* arg) { + Status s = *reinterpret_cast(arg); + if (s.IsPathNotFound()) { + ++table_files_not_exist; + } else if (!s.ok()) { + assert(false); // Should not reach here + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + ASSERT_OK(dbfull()->Flush(FlushOptions())); + } + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ASSERT_NE(nullptr, db_secondary_full()); + ReadOptions ropts; + ropts.verify_checksums = true; + std::string value; + ASSERT_NOK(db_secondary_->Get(ropts, "foo", &value)); + ASSERT_NOK(db_secondary_->Get(ropts, "bar", &value)); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + ASSERT_EQ(options.level0_file_num_compaction_trigger, table_files_not_exist); + ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); + ASSERT_EQ("foo_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + value); + ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); + ASSERT_EQ("bar_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + value); + Iterator* iter = db_secondary_->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + iter->Seek("bar"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ("bar_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + iter->value().ToString()); + iter->Seek("foo"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ("foo_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + iter->value().ToString()); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ++count; + } + ASSERT_EQ(2, count); + delete iter; +} + +TEST_F(DBSecondaryTest, PrimaryDropColumnFamily) { + Options options; + options.env = env_; + const std::string kCfName1 = "pikachu"; + CreateAndReopenWithCF({kCfName1}, options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondaryWithColumnFamilies({kCfName1}, options1); + ASSERT_EQ(2, handles_secondary_.size()); + + ASSERT_OK(Put(1 /*cf*/, "foo", "foo_val_1")); + ASSERT_OK(Flush(1 /*cf*/)); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + ReadOptions ropts; + ropts.verify_checksums = true; + std::string value; + ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value)); + ASSERT_EQ("foo_val_1", value); + + ASSERT_OK(dbfull()->DropColumnFamily(handles_[1])); + Close(); + CheckFileTypeCounts(dbname_, 1, 0, 1); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + value.clear(); + ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value)); + ASSERT_EQ("foo_val_1", value); +} + +TEST_F(DBSecondaryTest, SwitchManifest) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + const int kNumFiles = options.level0_file_num_compaction_trigger - 1; + // Keep it smaller than 10 so that key0, key1, ..., key9 are sorted as 0, 1, + // ..., 9. + const int kNumKeys = 10; + // Create two sst + for (int i = 0; i != kNumFiles; ++i) { + for (int j = 0; j != kNumKeys; ++j) { + ASSERT_OK(Put("key" + std::to_string(j), "value_" + std::to_string(i))); + } + ASSERT_OK(Flush()); + } + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + const auto& range_scan_db = [&]() { + ReadOptions tmp_ropts; + tmp_ropts.total_order_seek = true; + tmp_ropts.verify_checksums = true; + std::unique_ptr iter(db_secondary_->NewIterator(tmp_ropts)); + int cnt = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++cnt) { + ASSERT_EQ("key" + std::to_string(cnt), iter->key().ToString()); + ASSERT_EQ("value_" + std::to_string(kNumFiles - 1), + iter->value().ToString()); + } + }; + + range_scan_db(); + + // While secondary instance still keeps old MANIFEST open, we close primary, + // restart primary, performs full compaction, close again, restart again so + // that next time secondary tries to catch up with primary, the secondary + // will skip the MANIFEST in middle. + Reopen(options); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + Reopen(options); + ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}})); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + range_scan_db(); +} +#endif //! ROCKSDB_LITE + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/log_reader.cc b/db/log_reader.cc index 2c57cde5d..e734e9d6c 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -24,8 +24,7 @@ Reader::Reporter::~Reporter() { Reader::Reader(std::shared_ptr info_log, std::unique_ptr&& _file, - Reporter* reporter, bool checksum, uint64_t log_num, - bool retry_after_eof) + Reporter* reporter, bool checksum, uint64_t log_num) : info_log_(info_log), file_(std::move(_file)), reporter_(reporter), @@ -38,8 +37,7 @@ Reader::Reader(std::shared_ptr info_log, last_record_offset_(0), end_of_buffer_offset_(0), log_number_(log_num), - recycled_(false), - retry_after_eof_(retry_after_eof) {} + recycled_(false) {} Reader::~Reader() { delete[] backing_store_; @@ -207,14 +205,14 @@ void Reader::UnmarkEOF() { if (read_error_) { return; } - eof_ = false; - - // If retry_after_eof_ is true, we have to proceed to read anyway. - if (!retry_after_eof_ && eof_offset_ == 0) { + if (eof_offset_ == 0) { return; } + UnmarkEOFInternal(); +} +void Reader::UnmarkEOFInternal() { // If the EOF was in the middle of a block (a partial block was read) we have // to read the rest of the block as ReadPhysicalRecord can only read full // blocks and expects the file position indicator to be aligned to the start @@ -292,12 +290,8 @@ bool Reader::ReadMore(size_t* drop_size, int *error) { } else if (buffer_.size() < static_cast(kBlockSize)) { eof_ = true; eof_offset_ = buffer_.size(); - TEST_SYNC_POINT("LogReader::ReadMore:FirstEOF"); } return true; - } else if (retry_after_eof_ && !read_error_) { - UnmarkEOF(); - return !read_error_; } else { // Note that if buffer_ is non-empty, we have a truncated header at the // end of the file, which can be caused by the writer crashing in the @@ -355,24 +349,16 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) { } } if (header_size + length > buffer_.size()) { - if (!retry_after_eof_) { - *drop_size = buffer_.size(); - buffer_.clear(); - if (!eof_) { - return kBadRecordLen; - } - // If the end of the file has been reached without reading |length| - // bytes of payload, assume the writer died in the middle of writing the - // record. Don't report a corruption unless requested. - if (*drop_size) { - return kBadHeader; - } - } else { - int r = kEof; - if (!ReadMore(drop_size, &r)) { - return r; - } - continue; + *drop_size = buffer_.size(); + buffer_.clear(); + if (!eof_) { + return kBadRecordLen; + } + // If the end of the file has been reached without reading |length| + // bytes of payload, assume the writer died in the middle of writing the + // record. Don't report a corruption unless requested. + if (*drop_size) { + return kBadHeader; } return kEof; } @@ -409,5 +395,229 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) { } } +bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, + WALRecoveryMode /*unused*/) { + assert(record != nullptr); + assert(scratch != nullptr); + record->clear(); + scratch->clear(); + + uint64_t prospective_record_offset = 0; + uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); + size_t drop_size = 0; + unsigned int fragment_type_or_err = 0; // Initialize to make compiler happy + Slice fragment; + while (TryReadFragment(&fragment, &drop_size, &fragment_type_or_err)) { + switch (fragment_type_or_err) { + case kFullType: + case kRecyclableFullType: + if (in_fragmented_record_ && !fragments_.empty()) { + ReportCorruption(fragments_.size(), "partial record without end(1)"); + } + fragments_.clear(); + *record = fragment; + prospective_record_offset = physical_record_offset; + last_record_offset_ = prospective_record_offset; + in_fragmented_record_ = false; + return true; + + case kFirstType: + case kRecyclableFirstType: + if (in_fragmented_record_ || !fragments_.empty()) { + ReportCorruption(fragments_.size(), "partial record without end(2)"); + } + prospective_record_offset = physical_record_offset; + fragments_.assign(fragment.data(), fragment.size()); + in_fragmented_record_ = true; + break; + + case kMiddleType: + case kRecyclableMiddleType: + if (!in_fragmented_record_) { + ReportCorruption(fragment.size(), + "missing start of fragmented record(1)"); + } else { + fragments_.append(fragment.data(), fragment.size()); + } + break; + + case kLastType: + case kRecyclableLastType: + if (!in_fragmented_record_) { + ReportCorruption(fragment.size(), + "missing start of fragmented record(2)"); + } else { + fragments_.append(fragment.data(), fragment.size()); + scratch->assign(fragments_.data(), fragments_.size()); + fragments_.clear(); + *record = Slice(*scratch); + last_record_offset_ = prospective_record_offset; + in_fragmented_record_ = false; + return true; + } + break; + + case kBadHeader: + case kBadRecord: + case kEof: + case kOldRecord: + if (in_fragmented_record_) { + ReportCorruption(fragments_.size(), "error in middle of record"); + in_fragmented_record_ = false; + fragments_.clear(); + } + break; + + case kBadRecordChecksum: + if (recycled_) { + fragments_.clear(); + return false; + } + ReportCorruption(drop_size, "checksum mismatch"); + if (in_fragmented_record_) { + ReportCorruption(fragments_.size(), "error in middle of record"); + in_fragmented_record_ = false; + fragments_.clear(); + } + break; + + default: { + char buf[40]; + snprintf(buf, sizeof(buf), "unknown record type %u", + fragment_type_or_err); + ReportCorruption( + fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0), + buf); + in_fragmented_record_ = false; + fragments_.clear(); + break; + } + } + } + return false; +} + +void FragmentBufferedReader::UnmarkEOF() { + if (read_error_) { + return; + } + eof_ = false; + UnmarkEOFInternal(); +} + +bool FragmentBufferedReader::TryReadMore(size_t* drop_size, int* error) { + if (!eof_ && !read_error_) { + // Last read was a full read, so this is a trailer to skip + buffer_.clear(); + Status status = file_->Read(kBlockSize, &buffer_, backing_store_); + end_of_buffer_offset_ += buffer_.size(); + if (!status.ok()) { + buffer_.clear(); + ReportDrop(kBlockSize, status); + read_error_ = true; + *error = kEof; + return false; + } else if (buffer_.size() < static_cast(kBlockSize)) { + eof_ = true; + eof_offset_ = buffer_.size(); + TEST_SYNC_POINT_CALLBACK( + "FragmentBufferedLogReader::TryReadMore:FirstEOF", nullptr); + } + return true; + } else if (!read_error_) { + UnmarkEOF(); + } + if (!read_error_) { + return true; + } + *error = kEof; + *drop_size = buffer_.size(); + if (buffer_.size() > 0) { + *error = kBadHeader; + } + buffer_.clear(); + return false; +} + +// return true if the caller should process the fragment_type_or_err. +bool FragmentBufferedReader::TryReadFragment( + Slice* fragment, size_t* drop_size, unsigned int* fragment_type_or_err) { + assert(fragment != nullptr); + assert(drop_size != nullptr); + assert(fragment_type_or_err != nullptr); + + while (buffer_.size() < static_cast(kHeaderSize)) { + size_t old_size = buffer_.size(); + int error = kEof; + if (!TryReadMore(drop_size, &error)) { + *fragment_type_or_err = error; + return false; + } else if (old_size == buffer_.size()) { + return false; + } + } + const char* header = buffer_.data(); + const uint32_t a = static_cast(header[4]) & 0xff; + const uint32_t b = static_cast(header[5]) & 0xff; + const unsigned int type = header[6]; + const uint32_t length = a | (b << 8); + int header_size = kHeaderSize; + if (type >= kRecyclableFullType && type <= kRecyclableLastType) { + if (end_of_buffer_offset_ - buffer_.size() == 0) { + recycled_ = true; + } + header_size = kRecyclableHeaderSize; + while (buffer_.size() < static_cast(kRecyclableHeaderSize)) { + size_t old_size = buffer_.size(); + int error = kEof; + if (!TryReadMore(drop_size, &error)) { + *fragment_type_or_err = error; + return false; + } else if (old_size == buffer_.size()) { + return false; + } + } + const uint32_t log_num = DecodeFixed32(header + 7); + if (log_num != log_number_) { + *fragment_type_or_err = kOldRecord; + return true; + } + } + + while (header_size + length > buffer_.size()) { + size_t old_size = buffer_.size(); + int error = kEof; + if (!TryReadMore(drop_size, &error)) { + *fragment_type_or_err = error; + return false; + } else if (old_size == buffer_.size()) { + return false; + } + } + + if (type == kZeroType && length == 0) { + buffer_.clear(); + *fragment_type_or_err = kBadRecord; + return true; + } + + if (checksum_) { + uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); + uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6); + if (actual_crc != expected_crc) { + *drop_size = buffer_.size(); + buffer_.clear(); + *fragment_type_or_err = kBadRecordChecksum; + return true; + } + } + + buffer_.remove_prefix(header_size + length); + + *fragment = Slice(header + header_size, length); + *fragment_type_or_err = type; + return true; +} + } // namespace log } // namespace rocksdb diff --git a/db/log_reader.h b/db/log_reader.h index 2c4f4f059..63777d6da 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -53,18 +53,18 @@ class Reader { Reader(std::shared_ptr info_log, // @lint-ignore TXT2 T25377293 Grandfathered in std::unique_ptr&& file, Reporter* reporter, - bool checksum, uint64_t log_num, bool retry_after_eof); + bool checksum, uint64_t log_num); - ~Reader(); + virtual ~Reader(); // Read the next record into *record. Returns true if read // successfully, false if we hit end of the input. May use // "*scratch" as temporary storage. The contents filled in *record // will only be valid until the next mutating operation on this // reader or the next mutation to *scratch. - bool ReadRecord(Slice* record, std::string* scratch, - WALRecoveryMode wal_recovery_mode = - WALRecoveryMode::kTolerateCorruptedTailRecords); + virtual bool ReadRecord(Slice* record, std::string* scratch, + WALRecoveryMode wal_recovery_mode = + WALRecoveryMode::kTolerateCorruptedTailRecords); // Returns the physical offset of the last record returned by ReadRecord. // @@ -76,21 +76,28 @@ class Reader { return eof_; } + // returns true if the reader has encountered read error. + bool hasReadError() const { return read_error_; } + // when we know more data has been written to the file. we can use this // function to force the reader to look again in the file. // Also aligns the file position indicator to the start of the next block // by reading the rest of the data from the EOF position to the end of the // block that was partially read. - void UnmarkEOF(); + virtual void UnmarkEOF(); SequentialFileReader* file() { return file_.get(); } - private: + Reporter* GetReporter() const { return reporter_; } + + protected: std::shared_ptr info_log_; const std::unique_ptr file_; Reporter* const reporter_; bool const checksum_; char* const backing_store_; + + // Internal state variables used for reading records Slice buffer_; bool eof_; // Last Read() indicated EOF by returning < kBlockSize bool read_error_; // Error occurred while reading from file @@ -110,11 +117,6 @@ class Reader { // Whether this is a recycled log file bool recycled_; - // Whether retry after encountering EOF - // TODO (yanqin) add support for retry policy, e.g. sleep, max retry limit, - // etc. - const bool retry_after_eof_; - // Extend record types with the following special values enum { kEof = kMaxRecordType + 1, @@ -139,15 +141,47 @@ class Reader { // Read some more bool ReadMore(size_t* drop_size, int *error); + void UnmarkEOFInternal(); + // Reports dropped bytes to the reporter. // buffer_ must be updated to remove the dropped bytes prior to invocation. void ReportCorruption(size_t bytes, const char* reason); void ReportDrop(size_t bytes, const Status& reason); + private: // No copying allowed Reader(const Reader&); void operator=(const Reader&); }; +class FragmentBufferedReader : public Reader { + public: + FragmentBufferedReader(std::shared_ptr info_log, + // @lint-ignore TXT2 T25377293 Grandfathered in + std::unique_ptr&& _file, + Reporter* reporter, bool checksum, uint64_t log_num) + : Reader(info_log, std::move(_file), reporter, checksum, log_num), + fragments_(), + in_fragmented_record_(false) {} + ~FragmentBufferedReader() override {} + bool ReadRecord(Slice* record, std::string* scratch, + WALRecoveryMode wal_recovery_mode = + WALRecoveryMode::kTolerateCorruptedTailRecords) override; + void UnmarkEOF() override; + + private: + std::string fragments_; + bool in_fragmented_record_; + + bool TryReadFragment(Slice* result, size_t* drop_size, + unsigned int* fragment_type_or_err); + + bool TryReadMore(size_t* drop_size, int* error); + + // No copy allowed + FragmentBufferedReader(const FragmentBufferedReader&); + void operator=(const FragmentBufferedReader&); +}; + } // namespace log } // namespace rocksdb diff --git a/db/log_test.cc b/db/log_test.cc index 834dec7cd..fd237b030 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -43,7 +43,10 @@ static std::string RandomSkewedString(int i, Random* rnd) { return BigString(NumberString(i), rnd->Skewed(17)); } -class LogTest : public ::testing::TestWithParam { +// Param type is tuple +// get<0>(tuple): non-zero if recycling log, zero if regular log +// get<1>(tuple): true if allow retry after read EOF, false otherwise +class LogTest : public ::testing::TestWithParam> { private: class StringSource : public SequentialFile { public: @@ -53,16 +56,20 @@ class LogTest : public ::testing::TestWithParam { bool force_eof_; size_t force_eof_position_; bool returned_partial_; - explicit StringSource(Slice& contents) : - contents_(contents), - force_error_(false), - force_error_position_(0), - force_eof_(false), - force_eof_position_(0), - returned_partial_(false) { } + bool fail_after_read_partial_; + explicit StringSource(Slice& contents, bool fail_after_read_partial) + : contents_(contents), + force_error_(false), + force_error_position_(0), + force_eof_(false), + force_eof_position_(0), + returned_partial_(false), + fail_after_read_partial_(fail_after_read_partial) {} Status Read(size_t n, Slice* result, char* scratch) override { - EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error"; + if (fail_after_read_partial_) { + EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error"; + } if (force_error_) { if (force_error_position_ >= n) { @@ -139,7 +146,7 @@ class LogTest : public ::testing::TestWithParam { } void reset_source_contents() { - auto src = dynamic_cast(reader_.file()->file()); + auto src = dynamic_cast(reader_->file()->file()); assert(src); src->contents_ = dest_contents(); } @@ -149,11 +156,10 @@ class LogTest : public ::testing::TestWithParam { std::unique_ptr source_holder_; ReportCollector report_; Writer writer_; - Reader reader_; + std::unique_ptr reader_; - // Record metadata for testing initial offset functionality - static size_t initial_offset_record_sizes_[]; - uint64_t initial_offset_last_record_offsets_[4]; + protected: + bool allow_retry_read_; public: LogTest() @@ -161,18 +167,18 @@ class LogTest : public ::testing::TestWithParam { dest_holder_(test::GetWritableFileWriter( new test::StringSink(&reader_contents_), "" /* don't care */)), source_holder_(test::GetSequentialFileReader( - new StringSource(reader_contents_), "" /* file name */)), - writer_(std::move(dest_holder_), 123, GetParam()), - reader_(nullptr, std::move(source_holder_), &report_, - true /* checksum */, 123 /* log_number */, - false /* retry_after_eof */) { - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; - initial_offset_last_record_offsets_[0] = 0; - initial_offset_last_record_offsets_[1] = header_size + 10000; - initial_offset_last_record_offsets_[2] = 2 * (header_size + 10000); - initial_offset_last_record_offsets_[3] = 2 * (header_size + 10000) + - (2 * log::kBlockSize - 1000) + - 3 * header_size; + new StringSource(reader_contents_, !std::get<1>(GetParam())), + "" /* file name */)), + writer_(std::move(dest_holder_), 123, std::get<0>(GetParam())), + allow_retry_read_(std::get<1>(GetParam())) { + if (allow_retry_read_) { + reader_.reset(new FragmentBufferedReader( + nullptr, std::move(source_holder_), &report_, true /* checksum */, + 123 /* log_number */)); + } else { + reader_.reset(new Reader(nullptr, std::move(source_holder_), &report_, + true /* checksum */, 123 /* log_number */)); + } } Slice* get_reader_contents() { return &reader_contents_; } @@ -189,7 +195,9 @@ class LogTest : public ::testing::TestWithParam { WALRecoveryMode::kTolerateCorruptedTailRecords) { std::string scratch; Slice record; - if (reader_.ReadRecord(&record, &scratch, wal_recovery_mode)) { + bool ret = false; + ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode); + if (ret) { return record.ToString(); } else { return "EOF"; @@ -221,7 +229,7 @@ class LogTest : public ::testing::TestWithParam { } void ForceError(size_t position = 0) { - auto src = dynamic_cast(reader_.file()->file()); + auto src = dynamic_cast(reader_->file()->file()); src->force_error_ = true; src->force_error_position_ = position; } @@ -235,20 +243,18 @@ class LogTest : public ::testing::TestWithParam { } void ForceEOF(size_t position = 0) { - auto src = dynamic_cast(reader_.file()->file()); + auto src = dynamic_cast(reader_->file()->file()); src->force_eof_ = true; src->force_eof_position_ = position; } void UnmarkEOF() { - auto src = dynamic_cast(reader_.file()->file()); + auto src = dynamic_cast(reader_->file()->file()); src->returned_partial_ = false; - reader_.UnmarkEOF(); + reader_->UnmarkEOF(); } - bool IsEOF() { - return reader_.IsEOF(); - } + bool IsEOF() { return reader_->IsEOF(); } // Returns OK iff recorded error message contains "msg" std::string MatchError(const std::string& msg) const { @@ -258,23 +264,8 @@ class LogTest : public ::testing::TestWithParam { return "OK"; } } - - void WriteInitialOffsetLog() { - for (int i = 0; i < 4; i++) { - std::string record(initial_offset_record_sizes_[i], - static_cast('a' + i)); - Write(record); - } - } - }; -size_t LogTest::initial_offset_record_sizes_[] = - {10000, // Two sizable records in first block - 10000, - 2 * log::kBlockSize - 1000, // Span three blocks - 1}; - TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); } TEST_P(LogTest, ReadWrite) { @@ -312,7 +303,8 @@ TEST_P(LogTest, Fragmentation) { TEST_P(LogTest, MarginalTrailer) { // Make a trailer that is exactly the same length as an empty record. - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + int header_size = + std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; const int n = kBlockSize - 2 * header_size; Write(BigString("foo", n)); ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes()); @@ -326,7 +318,8 @@ TEST_P(LogTest, MarginalTrailer) { TEST_P(LogTest, MarginalTrailer2) { // Make a trailer that is exactly the same length as an empty record. - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + int header_size = + std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; const int n = kBlockSize - 2 * header_size; Write(BigString("foo", n)); ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes()); @@ -339,7 +332,8 @@ TEST_P(LogTest, MarginalTrailer2) { } TEST_P(LogTest, ShortTrailer) { - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + int header_size = + std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; const int n = kBlockSize - 2 * header_size + 4; Write(BigString("foo", n)); ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes()); @@ -352,7 +346,8 @@ TEST_P(LogTest, ShortTrailer) { } TEST_P(LogTest, AlignedEof) { - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + int header_size = + std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; const int n = kBlockSize - 2 * header_size + 4; Write(BigString("foo", n)); ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes()); @@ -403,6 +398,11 @@ TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) { } TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) { + if (allow_retry_read_) { + // If read retry is allowed, then truncated trailing record should not + // raise an error. + return; + } Write("foo"); ShrinkSize(4); // Drop all payload as well as a header byte ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); @@ -412,13 +412,20 @@ TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) { } TEST_P(LogTest, BadLength) { - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + if (allow_retry_read_) { + // If read retry is allowed, then we should not raise an error when the + // record length specified in header is longer than data currently + // available. It's possible that the body of the record is not written yet. + return; + } + bool recyclable_log = (std::get<0>(GetParam()) != 0); + int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize; const int kPayloadSize = kBlockSize - header_size; Write(BigString("bar", kPayloadSize)); Write("foo"); // Least significant size byte is stored in header[4]. IncrementByte(4, 1); - if (!GetParam()) { + if (!recyclable_log) { ASSERT_EQ("foo", Read()); ASSERT_EQ(kBlockSize, DroppedBytes()); ASSERT_EQ("OK", MatchError("bad record length")); @@ -428,6 +435,12 @@ TEST_P(LogTest, BadLength) { } TEST_P(LogTest, BadLengthAtEndIsIgnored) { + if (allow_retry_read_) { + // If read retry is allowed, then we should not raise an error when the + // record length specified in header is longer than data currently + // available. It's possible that the body of the record is not written yet. + return; + } Write("foo"); ShrinkSize(1); ASSERT_EQ("EOF", Read()); @@ -436,6 +449,12 @@ TEST_P(LogTest, BadLengthAtEndIsIgnored) { } TEST_P(LogTest, BadLengthAtEndIsNotIgnored) { + if (allow_retry_read_) { + // If read retry is allowed, then we should not raise an error when the + // record length specified in header is longer than data currently + // available. It's possible that the body of the record is not written yet. + return; + } Write("foo"); ShrinkSize(1); ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); @@ -447,7 +466,8 @@ TEST_P(LogTest, ChecksumMismatch) { Write("foooooo"); IncrementByte(0, 14); ASSERT_EQ("EOF", Read()); - if (!GetParam()) { + bool recyclable_log = (std::get<0>(GetParam()) != 0); + if (!recyclable_log) { ASSERT_EQ(14U, DroppedBytes()); ASSERT_EQ("OK", MatchError("checksum mismatch")); } else { @@ -458,8 +478,10 @@ TEST_P(LogTest, ChecksumMismatch) { TEST_P(LogTest, UnexpectedMiddleType) { Write("foo"); - SetByte(6, static_cast(GetParam() ? kRecyclableMiddleType : kMiddleType)); - FixChecksum(0, 3, !!GetParam()); + bool recyclable_log = (std::get<0>(GetParam()) != 0); + SetByte(6, static_cast(recyclable_log ? kRecyclableMiddleType + : kMiddleType)); + FixChecksum(0, 3, !!recyclable_log); ASSERT_EQ("EOF", Read()); ASSERT_EQ(3U, DroppedBytes()); ASSERT_EQ("OK", MatchError("missing start")); @@ -467,8 +489,10 @@ TEST_P(LogTest, UnexpectedMiddleType) { TEST_P(LogTest, UnexpectedLastType) { Write("foo"); - SetByte(6, static_cast(GetParam() ? kRecyclableLastType : kLastType)); - FixChecksum(0, 3, !!GetParam()); + bool recyclable_log = (std::get<0>(GetParam()) != 0); + SetByte(6, + static_cast(recyclable_log ? kRecyclableLastType : kLastType)); + FixChecksum(0, 3, !!recyclable_log); ASSERT_EQ("EOF", Read()); ASSERT_EQ(3U, DroppedBytes()); ASSERT_EQ("OK", MatchError("missing start")); @@ -477,8 +501,10 @@ TEST_P(LogTest, UnexpectedLastType) { TEST_P(LogTest, UnexpectedFullType) { Write("foo"); Write("bar"); - SetByte(6, static_cast(GetParam() ? kRecyclableFirstType : kFirstType)); - FixChecksum(0, 3, !!GetParam()); + bool recyclable_log = (std::get<0>(GetParam()) != 0); + SetByte( + 6, static_cast(recyclable_log ? kRecyclableFirstType : kFirstType)); + FixChecksum(0, 3, !!recyclable_log); ASSERT_EQ("bar", Read()); ASSERT_EQ("EOF", Read()); ASSERT_EQ(3U, DroppedBytes()); @@ -488,8 +514,10 @@ TEST_P(LogTest, UnexpectedFullType) { TEST_P(LogTest, UnexpectedFirstType) { Write("foo"); Write(BigString("bar", 100000)); - SetByte(6, static_cast(GetParam() ? kRecyclableFirstType : kFirstType)); - FixChecksum(0, 3, !!GetParam()); + bool recyclable_log = (std::get<0>(GetParam()) != 0); + SetByte( + 6, static_cast(recyclable_log ? kRecyclableFirstType : kFirstType)); + FixChecksum(0, 3, !!recyclable_log); ASSERT_EQ(BigString("bar", 100000), Read()); ASSERT_EQ("EOF", Read()); ASSERT_EQ(3U, DroppedBytes()); @@ -506,6 +534,11 @@ TEST_P(LogTest, MissingLastIsIgnored) { } TEST_P(LogTest, MissingLastIsNotIgnored) { + if (allow_retry_read_) { + // If read retry is allowed, then truncated trailing record should not + // raise an error. + return; + } Write(BigString("bar", kBlockSize)); // Remove the LAST block, including header. ShrinkSize(14); @@ -524,6 +557,11 @@ TEST_P(LogTest, PartialLastIsIgnored) { } TEST_P(LogTest, PartialLastIsNotIgnored) { + if (allow_retry_read_) { + // If read retry is allowed, then truncated trailing record should not + // raise an error. + return; + } Write(BigString("bar", kBlockSize)); // Cause a bad record length in the LAST block. ShrinkSize(1); @@ -550,7 +588,8 @@ TEST_P(LogTest, ErrorJoinsRecords) { SetByte(offset, 'x'); } - if (!GetParam()) { + bool recyclable_log = (std::get<0>(GetParam()) != 0); + if (!recyclable_log) { ASSERT_EQ("correct", Read()); ASSERT_EQ("EOF", Read()); size_t dropped = DroppedBytes(); @@ -564,7 +603,8 @@ TEST_P(LogTest, ErrorJoinsRecords) { TEST_P(LogTest, ClearEofSingleBlock) { Write("foo"); Write("bar"); - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + bool recyclable_log = (std::get<0>(GetParam()) != 0); + int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize; ForceEOF(3 + header_size + 2); ASSERT_EQ("foo", Read()); UnmarkEOF(); @@ -579,7 +619,8 @@ TEST_P(LogTest, ClearEofSingleBlock) { TEST_P(LogTest, ClearEofMultiBlock) { size_t num_full_blocks = 5; - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + bool recyclable_log = (std::get<0>(GetParam()) != 0); + int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize; size_t n = (kBlockSize - header_size) * num_full_blocks + 25; Write(BigString("foo", n)); Write(BigString("bar", n)); @@ -628,7 +669,8 @@ TEST_P(LogTest, ClearEofError2) { } TEST_P(LogTest, Recycle) { - if (!GetParam()) { + bool recyclable_log = (std::get<0>(GetParam()) != 0); + if (!recyclable_log) { return; // test is only valid for recycled logs } Write("foo"); @@ -651,7 +693,11 @@ TEST_P(LogTest, Recycle) { ASSERT_EQ("EOF", Read()); } -INSTANTIATE_TEST_CASE_P(bool, LogTest, ::testing::Values(0, 2)); +INSTANTIATE_TEST_CASE_P(bool, LogTest, + ::testing::Values(std::make_tuple(0, false), + std::make_tuple(0, true), + std::make_tuple(1, false), + std::make_tuple(1, true))); class RetriableLogTest : public ::testing::TestWithParam { private: @@ -677,7 +723,7 @@ class RetriableLogTest : public ::testing::TestWithParam { std::unique_ptr writer_; std::unique_ptr reader_; ReportCollector report_; - std::unique_ptr log_reader_; + std::unique_ptr log_reader_; public: RetriableLogTest() @@ -716,9 +762,9 @@ class RetriableLogTest : public ::testing::TestWithParam { if (s.ok()) { reader_.reset(new SequentialFileReader(std::move(seq_file), log_file_)); assert(reader_ != nullptr); - log_reader_.reset(new Reader(nullptr, std::move(reader_), &report_, - true /* checksum */, 123 /* log_number */, - true /* retry_after_eof */)); + log_reader_.reset(new FragmentBufferedReader( + nullptr, std::move(reader_), &report_, true /* checksum */, + 123 /* log_number */)); assert(log_reader_ != nullptr); } return s; @@ -738,14 +784,17 @@ class RetriableLogTest : public ::testing::TestWithParam { writer_->Sync(true); } - std::string Read() { - auto wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords; + bool TryRead(std::string* result) { + assert(result != nullptr); + result->clear(); std::string scratch; Slice record; - if (log_reader_->ReadRecord(&record, &scratch, wal_recovery_mode)) { - return record.ToString(); + bool r = log_reader_->ReadRecord(&record, &scratch); + if (r) { + result->assign(record.data(), record.size()); + return true; } else { - return "Read error"; + return false; } } }; @@ -754,12 +803,17 @@ TEST_P(RetriableLogTest, TailLog_PartialHeader) { ASSERT_OK(SetupTestEnv()); std::vector remaining_bytes_in_last_record; size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + bool eof = false; SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->LoadDependency( {{"RetriableLogTest::TailLog:AfterPart1", "RetriableLogTest::TailLog:BeforeReadRecord"}, - {"LogReader::ReadMore:FirstEOF", + {"FragmentBufferedLogReader::TryReadMore:FirstEOF", "RetriableLogTest::TailLog:BeforePart2"}}); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "FragmentBufferedLogReader::TryReadMore:FirstEOF", + [&](void* /*arg*/) { eof = true; }); SyncPoint::GetInstance()->EnableProcessing(); size_t delta = header_size - 1; @@ -779,23 +833,30 @@ TEST_P(RetriableLogTest, TailLog_PartialHeader) { std::string record; port::Thread log_reader_thread([&]() { TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord"); - record = Read(); + while (!TryRead(&record)) { + } }); log_reader_thread.join(); log_writer_thread.join(); ASSERT_EQ("foo", record); + ASSERT_TRUE(eof); } TEST_P(RetriableLogTest, TailLog_FullHeader) { ASSERT_OK(SetupTestEnv()); std::vector remaining_bytes_in_last_record; size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + bool eof = false; SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->LoadDependency( {{"RetriableLogTest::TailLog:AfterPart1", "RetriableLogTest::TailLog:BeforeReadRecord"}, - {"LogReader::ReadMore:FirstEOF", + {"FragmentBufferedLogReader::TryReadMore:FirstEOF", "RetriableLogTest::TailLog:BeforePart2"}}); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "FragmentBufferedLogReader::TryReadMore:FirstEOF", + [&](void* /*arg*/) { eof = true; }); SyncPoint::GetInstance()->EnableProcessing(); size_t delta = header_size + 1; @@ -810,18 +871,45 @@ TEST_P(RetriableLogTest, TailLog_FullHeader) { TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1"); TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2"); Write(Slice(part2)); + ASSERT_TRUE(eof); }); std::string record; port::Thread log_reader_thread([&]() { TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord"); - record = Read(); + while (!TryRead(&record)) { + } }); log_reader_thread.join(); log_writer_thread.join(); ASSERT_EQ("foo", record); } +TEST_P(RetriableLogTest, NonBlockingReadFullRecord) { + // Clear all sync point callbacks even if this test does not use sync point. + // It is necessary, otherwise the execute of this test may hit a sync point + // with which a callback is registered. The registered callback may access + // some dead variable, causing segfault. + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + ASSERT_OK(SetupTestEnv()); + size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + size_t delta = header_size - 1; + size_t old_sz = contents().size(); + Encode("foo-bar"); + size_t new_sz = contents().size(); + std::string part1 = contents().substr(old_sz, delta); + std::string part2 = + contents().substr(old_sz + delta, new_sz - old_sz - delta); + Write(Slice(part1)); + std::string record; + ASSERT_FALSE(TryRead(&record)); + ASSERT_TRUE(record.empty()); + Write(Slice(part2)); + ASSERT_TRUE(TryRead(&record)); + ASSERT_EQ("foo-bar", record); +} + INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2)); } // namespace log diff --git a/db/repair.cc b/db/repair.cc index ae74e578c..7b9409a22 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -364,8 +364,7 @@ class Repairer { // propagating bad information (like overly large sequence // numbers). log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter, - true /*enable checksum*/, log, - false /* retry_after_eof */); + true /*enable checksum*/, log); // Initialize per-column family memtables for (auto* cfd : *vset_.GetColumnFamilySet()) { diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 4d6671ef6..4f55a30d3 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -315,8 +315,7 @@ Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) { assert(file); currentLogReader_.reset( new log::Reader(options_->info_log, std::move(file), &reporter_, - read_options_.verify_checksums_, logFile->LogNumber(), - false /* retry_after_eof */)); + read_options_.verify_checksums_, logFile->LogNumber())); return Status::OK(); } } // namespace rocksdb diff --git a/db/version_builder.cc b/db/version_builder.cc index 7b45347c1..84e4dc657 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -364,10 +364,10 @@ class VersionBuilder::Rep { CheckConsistency(vstorage); } - void LoadTableHandlers(InternalStats* internal_stats, int max_threads, - bool prefetch_index_and_filter_in_cache, - bool is_initial_load, - const SliceTransform* prefix_extractor) { + Status LoadTableHandlers(InternalStats* internal_stats, int max_threads, + bool prefetch_index_and_filter_in_cache, + bool is_initial_load, + const SliceTransform* prefix_extractor) { assert(table_cache_ != nullptr); size_t table_cache_capacity = table_cache_->get_cache()->GetCapacity(); @@ -394,7 +394,8 @@ class VersionBuilder::Rep { size_t table_cache_usage = table_cache_->get_cache()->GetUsage(); if (table_cache_usage >= load_limit) { - return; + // TODO (yanqin) find a suitable status code. + return Status::OK(); } else { max_load = load_limit - table_cache_usage; } @@ -402,11 +403,15 @@ class VersionBuilder::Rep { // std::vector> files_meta; + std::vector statuses; for (int level = 0; level < num_levels_; level++) { for (auto& file_meta_pair : levels_[level].added_files) { auto* file_meta = file_meta_pair.second; - assert(!file_meta->table_reader_handle); - files_meta.emplace_back(file_meta, level); + // If the file has been opened before, just skip it. + if (!file_meta->table_reader_handle) { + files_meta.emplace_back(file_meta, level); + statuses.emplace_back(Status::OK()); + } if (files_meta.size() >= max_load) { break; } @@ -426,7 +431,7 @@ class VersionBuilder::Rep { auto* file_meta = files_meta[file_idx].first; int level = files_meta[file_idx].second; - table_cache_->FindTable( + statuses[file_idx] = table_cache_->FindTable( env_options_, *(base_vstorage_->InternalComparator()), file_meta->fd, &file_meta->table_reader_handle, prefix_extractor, false /*no_io */, true /* record_read_stats */, @@ -448,6 +453,12 @@ class VersionBuilder::Rep { for (auto& t : threads) { t.join(); } + for (const auto& s : statuses) { + if (!s.ok()) { + return s; + } + } + return Status::OK(); } void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) { @@ -487,14 +498,13 @@ void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) { rep_->SaveTo(vstorage); } -void VersionBuilder::LoadTableHandlers(InternalStats* internal_stats, - int max_threads, - bool prefetch_index_and_filter_in_cache, - bool is_initial_load, - const SliceTransform* prefix_extractor) { - rep_->LoadTableHandlers(internal_stats, max_threads, - prefetch_index_and_filter_in_cache, is_initial_load, - prefix_extractor); +Status VersionBuilder::LoadTableHandlers( + InternalStats* internal_stats, int max_threads, + bool prefetch_index_and_filter_in_cache, bool is_initial_load, + const SliceTransform* prefix_extractor) { + return rep_->LoadTableHandlers(internal_stats, max_threads, + prefetch_index_and_filter_in_cache, + is_initial_load, prefix_extractor); } void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level, diff --git a/db/version_builder.h b/db/version_builder.h index d6ee37e08..168301fdd 100644 --- a/db/version_builder.h +++ b/db/version_builder.h @@ -33,10 +33,10 @@ class VersionBuilder { bool CheckConsistencyForNumLevels(); void Apply(VersionEdit* edit); void SaveTo(VersionStorageInfo* vstorage); - void LoadTableHandlers(InternalStats* internal_stats, int max_threads, - bool prefetch_index_and_filter_in_cache, - bool is_initial_load, - const SliceTransform* prefix_extractor); + Status LoadTableHandlers(InternalStats* internal_stats, int max_threads, + bool prefetch_index_and_filter_in_cache, + bool is_initial_load, + const SliceTransform* prefix_extractor); void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f); private: diff --git a/db/version_edit.h b/db/version_edit.h index 229531792..ee6499cdc 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -313,6 +313,7 @@ class VersionEdit { std::string DebugJSON(int edit_num, bool hex_key = false) const; private: + friend class ReactiveVersionSet; friend class VersionSet; friend class Version; diff --git a/db/version_set.cc b/db/version_set.cc index 12c7754b1..5241608df 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -712,6 +712,7 @@ void LevelIterator::InitFileIterator(size_t new_file_index) { } } } +} // anonymous namespace // A wrapper of version builder which references the current version in // constructor and unref it in the destructor. @@ -726,16 +727,14 @@ class BaseReferencedVersionBuilder { version_->Ref(); } ~BaseReferencedVersionBuilder() { - delete version_builder_; version_->Unref(); } - VersionBuilder* version_builder() { return version_builder_; } + VersionBuilder* version_builder() { return version_builder_.get(); } private: - VersionBuilder* version_builder_; + std::unique_ptr version_builder_; Version* version_; }; -} // anonymous namespace Status Version::GetTableProperties(std::shared_ptr* tp, const FileMetaData* file_meta, @@ -2936,7 +2935,7 @@ Status VersionSet::ProcessManifestWrites( } else if (group_start != std::numeric_limits::max()) { group_start = std::numeric_limits::max(); } - LogAndApplyHelper(last_writer->cfd, builder, version, e, mu); + LogAndApplyHelper(last_writer->cfd, builder, e, mu); batch_edits.push_back(e); } } @@ -2990,6 +2989,7 @@ Status VersionSet::ProcessManifestWrites( assert(pending_manifest_file_number_ == 0); if (!descriptor_log_ || manifest_file_size_ > db_options_->max_manifest_file_size) { + TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest"); pending_manifest_file_number_ = NewFileNumber(); batch_edits.back()->SetNextFile(next_file_number_.load()); new_descriptor_log = true; @@ -3098,6 +3098,7 @@ Status VersionSet::ProcessManifestWrites( if (s.ok() && new_descriptor_log) { s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_, db_directory); + TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest"); } if (s.ok()) { @@ -3225,7 +3226,7 @@ Status VersionSet::ProcessManifestWrites( return s; } -// 'datas' is gramatically incorrect. We still use this notation is to indicate +// 'datas' is gramatically incorrect. We still use this notation to indicate // that this variable represents a collection of column_family_data. Status VersionSet::LogAndApply( const autovector& column_family_datas, @@ -3325,8 +3326,8 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { } void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, - VersionBuilder* builder, Version* /*v*/, - VersionEdit* edit, InstrumentedMutex* mu) { + VersionBuilder* builder, VersionEdit* edit, + InstrumentedMutex* mu) { #ifdef NDEBUG (void)cfd; #endif @@ -3353,16 +3354,16 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, builder->Apply(edit); } -Status VersionSet::ApplyOneVersionEdit( +Status VersionSet::ApplyOneVersionEditToBuilder( VersionEdit& edit, const std::unordered_map& name_to_options, std::unordered_map& column_families_not_found, - std::unordered_map& builders, - bool* have_log_number, uint64_t* /* log_number */, - bool* have_prev_log_number, uint64_t* previous_log_number, - bool* have_next_file, uint64_t* next_file, bool* have_last_sequence, - SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep, - uint32_t* max_column_family) { + std::unordered_map>& + builders, + bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number, + uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file, + bool* have_last_sequence, SequenceNumber* last_sequence, + uint64_t* min_log_number_to_keep, uint32_t* max_column_family) { // Not found means that user didn't supply that column // family option AND we encountered column family add // record. Once we encounter column family drop record, @@ -3392,14 +3393,14 @@ Status VersionSet::ApplyOneVersionEdit( } else { cfd = CreateColumnFamily(cf_options->second, &edit); cfd->set_initialized(); - builders.insert( - {edit.column_family_, new BaseReferencedVersionBuilder(cfd)}); + builders.insert(std::make_pair( + edit.column_family_, std::unique_ptr( + new BaseReferencedVersionBuilder(cfd)))); } } else if (edit.is_column_family_drop_) { if (cf_in_builders) { auto builder = builders.find(edit.column_family_); assert(builder != builders.end()); - delete builder->second; builders.erase(builder); cfd = column_family_set_->GetColumnFamily(edit.column_family_); assert(cfd != nullptr); @@ -3433,7 +3434,18 @@ Status VersionSet::ApplyOneVersionEdit( assert(builder != builders.end()); builder->second->version_builder()->Apply(&edit); } + return ExtractInfoFromVersionEdit( + cfd, edit, have_log_number, log_number, have_prev_log_number, + previous_log_number, have_next_file, next_file, have_last_sequence, + last_sequence, min_log_number_to_keep, max_column_family); +} +Status VersionSet::ExtractInfoFromVersionEdit( + ColumnFamilyData* cfd, const VersionEdit& edit, bool* have_log_number, + uint64_t* log_number, bool* have_prev_log_number, + uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file, + bool* have_last_sequence, SequenceNumber* last_sequence, + uint64_t* min_log_number_to_keep, uint32_t* max_column_family) { if (cfd != nullptr) { if (edit.has_log_number_) { if (cfd->GetLogNumber() > edit.log_number_) { @@ -3444,6 +3456,7 @@ Status VersionSet::ApplyOneVersionEdit( } else { cfd->SetLogNumber(edit.log_number_); *have_log_number = true; + *log_number = edit.log_number_; } } if (edit.has_comparator_ && @@ -3480,6 +3493,31 @@ Status VersionSet::ApplyOneVersionEdit( return Status::OK(); } +Status VersionSet::GetCurrentManifestPath(std::string* manifest_path) { + assert(manifest_path != nullptr); + std::string fname; + Status s = ReadFileToString(env_, CurrentFileName(dbname_), &fname); + if (!s.ok()) { + return s; + } + if (fname.empty() || fname.back() != '\n') { + return Status::Corruption("CURRENT file does not end with newline"); + } + // remove the trailing '\n' + fname.resize(fname.size() - 1); + FileType type; + bool parse_ok = ParseFileName(fname, &manifest_file_number_, &type); + if (!parse_ok || type != kDescriptorFile) { + return Status::Corruption("CURRENT file corrupted"); + } + *manifest_path = dbname_; + if (dbname_.back() != '/') { + manifest_path->push_back('/'); + } + *manifest_path += fname; + return Status::OK(); +} + Status VersionSet::Recover( const std::vector& column_families, bool read_only) { @@ -3493,43 +3531,28 @@ Status VersionSet::Recover( std::unordered_map column_families_not_found; // Read "CURRENT" file, which contains a pointer to the current manifest file - std::string manifest_filename; - Status s = ReadFileToString( - env_, CurrentFileName(dbname_), &manifest_filename - ); + std::string manifest_path; + Status s = GetCurrentManifestPath(&manifest_path); if (!s.ok()) { return s; } - if (manifest_filename.empty() || - manifest_filename.back() != '\n') { - return Status::Corruption("CURRENT file does not end with newline"); - } - // remove the trailing '\n' - manifest_filename.resize(manifest_filename.size() - 1); - FileType type; - bool parse_ok = - ParseFileName(manifest_filename, &manifest_file_number_, &type); - if (!parse_ok || type != kDescriptorFile) { - return Status::Corruption("CURRENT file corrupted"); - } ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n", - manifest_filename.c_str()); + manifest_path.c_str()); - manifest_filename = dbname_ + "/" + manifest_filename; std::unique_ptr manifest_file_reader; { std::unique_ptr manifest_file; - s = env_->NewSequentialFile(manifest_filename, &manifest_file, + s = env_->NewSequentialFile(manifest_path, &manifest_file, env_->OptimizeForManifestRead(env_options_)); if (!s.ok()) { return s; } manifest_file_reader.reset( - new SequentialFileReader(std::move(manifest_file), manifest_filename)); + new SequentialFileReader(std::move(manifest_file), manifest_path)); } uint64_t current_manifest_file_size; - s = env_->GetFileSize(manifest_filename, ¤t_manifest_file_size); + s = env_->GetFileSize(manifest_path, ¤t_manifest_file_size); if (!s.ok()) { return s; } @@ -3544,7 +3567,8 @@ Status VersionSet::Recover( uint64_t previous_log_number = 0; uint32_t max_column_family = 0; uint64_t min_log_number_to_keep = 0; - std::unordered_map builders; + std::unordered_map> + builders; // add default column family auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName); @@ -3559,14 +3583,15 @@ Status VersionSet::Recover( // In recovery, nobody else can access it, so it's fine to set it to be // initialized earlier. default_cfd->set_initialized(); - builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)}); + builders.insert( + std::make_pair(0, std::unique_ptr( + new BaseReferencedVersionBuilder(default_cfd)))); { VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, - true /* checksum */, 0 /* log_number */, - false /* retry_after_eof */); + true /* checksum */, 0 /* log_number */); Slice record; std::string scratch; std::vector replay_buffer; @@ -3597,7 +3622,7 @@ Status VersionSet::Recover( TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:LastInAtomicGroup", &edit); for (auto& e : replay_buffer) { - s = ApplyOneVersionEdit( + s = ApplyOneVersionEditToBuilder( e, cf_name_to_options, column_families_not_found, builders, &have_log_number, &log_number, &have_prev_log_number, &previous_log_number, &have_next_file, &next_file, @@ -3618,7 +3643,7 @@ Status VersionSet::Recover( s = Status::Corruption("corrupted atomic group"); break; } - s = ApplyOneVersionEdit( + s = ApplyOneVersionEditToBuilder( edit, cf_name_to_options, column_families_not_found, builders, &have_log_number, &log_number, &have_prev_log_number, &previous_log_number, &have_next_file, &next_file, @@ -3689,7 +3714,7 @@ Status VersionSet::Recover( assert(cfd->initialized()); auto builders_iter = builders.find(cfd->GetID()); assert(builders_iter != builders.end()); - auto* builder = builders_iter->second->version_builder(); + auto builder = builders_iter->second->version_builder(); // unlimited table cache. Pre-load table handle now. // Need to do it out of the mutex. @@ -3725,7 +3750,7 @@ Status VersionSet::Recover( "prev_log_number is %lu," "max_column_family is %u," "min_log_number_to_keep is %lu\n", - manifest_filename.c_str(), (unsigned long)manifest_file_number_, + manifest_path.c_str(), (unsigned long)manifest_file_number_, (unsigned long)next_file_number_.load(), (unsigned long)last_sequence_, (unsigned long)log_number, (unsigned long)prev_log_number_, column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc()); @@ -3740,10 +3765,6 @@ Status VersionSet::Recover( } } - for (auto& builder : builders) { - delete builder.second; - } - return s; } @@ -3781,8 +3802,7 @@ Status VersionSet::ListColumnFamilies(std::vector* column_families, VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(nullptr, std::move(file_reader), &reporter, - true /* checksum */, 0 /* log_number */, - false /* retry_after_eof */); + true /* checksum */, 0 /* log_number */); Slice record; std::string scratch; while (reader.ReadRecord(&record, &scratch) && s.ok()) { @@ -3928,7 +3948,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, uint64_t previous_log_number = 0; int count = 0; std::unordered_map comparators; - std::unordered_map builders; + std::unordered_map> + builders; // add default column family VersionEdit default_cf_edit; @@ -3936,14 +3957,15 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, default_cf_edit.SetColumnFamily(0); ColumnFamilyData* default_cfd = CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit); - builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)}); + builders.insert( + std::make_pair(0, std::unique_ptr( + new BaseReferencedVersionBuilder(default_cfd)))); { VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(nullptr, std::move(file_reader), &reporter, - true /* checksum */, 0 /* log_number */, - false /* retry_after_eof */); + true /* checksum */, 0 /* log_number */); Slice record; std::string scratch; while (reader.ReadRecord(&record, &scratch) && s.ok()) { @@ -3978,8 +4000,9 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, } cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit); cfd->set_initialized(); - builders.insert( - {edit.column_family_, new BaseReferencedVersionBuilder(cfd)}); + builders.insert(std::make_pair( + edit.column_family_, std::unique_ptr( + new BaseReferencedVersionBuilder(cfd)))); } else if (edit.is_column_family_drop_) { if (!cf_in_builders) { s = Status::Corruption( @@ -3987,7 +4010,6 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, break; } auto builder_iter = builders.find(edit.column_family_); - delete builder_iter->second; builders.erase(builder_iter); comparators.erase(edit.column_family_); cfd = column_family_set_->GetColumnFamily(edit.column_family_); @@ -4087,11 +4109,6 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, delete v; } - // Free builders - for (auto& builder : builders) { - delete builder.second; - } - next_file_number_.store(next_file + 1); last_allocated_sequence_ = last_sequence; last_published_sequence_ = last_sequence; @@ -4583,4 +4600,405 @@ uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) { return total_files_size; } +ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname, + const ImmutableDBOptions* _db_options, + const EnvOptions& _env_options, + Cache* table_cache, + WriteBufferManager* write_buffer_manager, + WriteController* write_controller) + : VersionSet(dbname, _db_options, _env_options, table_cache, + write_buffer_manager, write_controller) {} + +ReactiveVersionSet::~ReactiveVersionSet() {} + +Status ReactiveVersionSet::Recover( + const std::vector& column_families, + std::unique_ptr* manifest_reader, + std::unique_ptr* manifest_reporter, + std::unique_ptr* manifest_reader_status) { + assert(manifest_reader != nullptr); + assert(manifest_reporter != nullptr); + assert(manifest_reader_status != nullptr); + + std::unordered_map cf_name_to_options; + for (const auto& cf : column_families) { + cf_name_to_options.insert({cf.name, cf.options}); + } + + // add default column family + auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName); + if (default_cf_iter == cf_name_to_options.end()) { + return Status::InvalidArgument("Default column family not specified"); + } + VersionEdit default_cf_edit; + default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName); + default_cf_edit.SetColumnFamily(0); + ColumnFamilyData* default_cfd = + CreateColumnFamily(default_cf_iter->second, &default_cf_edit); + // In recovery, nobody else can access it, so it's fine to set it to be + // initialized earlier. + default_cfd->set_initialized(); + + bool have_log_number = false; + bool have_prev_log_number = false; + bool have_next_file = false; + bool have_last_sequence = false; + uint64_t next_file = 0; + uint64_t last_sequence = 0; + uint64_t log_number = 0; + uint64_t previous_log_number = 0; + uint32_t max_column_family = 0; + uint64_t min_log_number_to_keep = 0; + std::unordered_map> + builders; + std::unordered_map column_families_not_found; + builders.insert( + std::make_pair(0, std::unique_ptr( + new BaseReferencedVersionBuilder(default_cfd)))); + + manifest_reader_status->reset(new Status()); + manifest_reporter->reset(new LogReporter()); + static_cast(manifest_reporter->get())->status = + manifest_reader_status->get(); + Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader); + log::Reader* reader = manifest_reader->get(); + + int retry = 0; + while (s.ok() && retry < 1) { + assert(reader != nullptr); + Slice record; + std::string scratch; + while (s.ok() && reader->ReadRecord(&record, &scratch)) { + VersionEdit edit; + s = edit.DecodeFrom(record); + if (!s.ok()) { + break; + } + s = ApplyOneVersionEditToBuilder( + edit, cf_name_to_options, column_families_not_found, builders, + &have_log_number, &log_number, &have_prev_log_number, + &previous_log_number, &have_next_file, &next_file, + &have_last_sequence, &last_sequence, &min_log_number_to_keep, + &max_column_family); + } + if (s.ok()) { + bool enough = have_next_file && have_log_number && have_last_sequence; + if (enough) { + for (const auto& cf : column_families) { + auto cfd = column_family_set_->GetColumnFamily(cf.name); + if (cfd == nullptr) { + enough = false; + break; + } + } + } + if (enough) { + for (const auto& cf : column_families) { + auto cfd = column_family_set_->GetColumnFamily(cf.name); + assert(cfd != nullptr); + if (!cfd->IsDropped()) { + auto builder_iter = builders.find(cfd->GetID()); + assert(builder_iter != builders.end()); + auto builder = builder_iter->second->version_builder(); + assert(builder != nullptr); + s = builder->LoadTableHandlers( + cfd->internal_stats(), db_options_->max_file_opening_threads, + false /* prefetch_index_and_filter_in_cache */, + true /* is_initial_load */, + cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); + if (!s.ok()) { + enough = false; + if (s.IsPathNotFound()) { + s = Status::OK(); + } + break; + } + } + } + } + if (enough) { + break; + } + } + ++retry; + } + + if (s.ok()) { + if (!have_prev_log_number) { + previous_log_number = 0; + } + column_family_set_->UpdateMaxColumnFamily(max_column_family); + + MarkMinLogNumberToKeep2PC(min_log_number_to_keep); + MarkFileNumberUsed(previous_log_number); + MarkFileNumberUsed(log_number); + + for (auto cfd : *column_family_set_) { + assert(builders.count(cfd->GetID()) > 0); + auto builder = builders[cfd->GetID()]->version_builder(); + if (!builder->CheckConsistencyForNumLevels()) { + s = Status::InvalidArgument( + "db has more levels than options.num_levels"); + break; + } + } + } + + if (s.ok()) { + for (auto cfd : *column_family_set_) { + if (cfd->IsDropped()) { + continue; + } + assert(cfd->initialized()); + auto builders_iter = builders.find(cfd->GetID()); + assert(builders_iter != builders.end()); + auto* builder = builders_iter->second->version_builder(); + + Version* v = new Version(cfd, this, env_options_, + *cfd->GetLatestMutableCFOptions(), + current_version_number_++); + builder->SaveTo(v->storage_info()); + + // Install recovered version + v->PrepareApply(*cfd->GetLatestMutableCFOptions(), + !(db_options_->skip_stats_update_on_db_open)); + AppendVersion(cfd, v); + } + next_file_number_.store(next_file + 1); + last_allocated_sequence_ = last_sequence; + last_published_sequence_ = last_sequence; + last_sequence_ = last_sequence; + prev_log_number_ = previous_log_number; + for (auto cfd : *column_family_set_) { + if (cfd->IsDropped()) { + continue; + } + ROCKS_LOG_INFO(db_options_->info_log, + "Column family [%s] (ID %u), log number is %" PRIu64 "\n", + cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber()); + } + } + return s; +} + +Status ReactiveVersionSet::ReadAndApply( + InstrumentedMutex* mu, + std::unique_ptr* manifest_reader, + std::unordered_set* cfds_changed) { + assert(manifest_reader != nullptr); + assert(cfds_changed != nullptr); + mu->AssertHeld(); + + Status s; + bool have_log_number = false; + bool have_prev_log_number = false; + bool have_next_file = false; + bool have_last_sequence = false; + uint64_t next_file = 0; + uint64_t last_sequence = 0; + uint64_t log_number = 0; + uint64_t previous_log_number = 0; + uint32_t max_column_family = 0; + uint64_t min_log_number_to_keep = 0; + + while (s.ok()) { + Slice record; + std::string scratch; + log::Reader* reader = manifest_reader->get(); + std::string old_manifest_path = reader->file()->file_name(); + while (reader->ReadRecord(&record, &scratch)) { + VersionEdit edit; + s = edit.DecodeFrom(record); + if (!s.ok()) { + break; + } + ColumnFamilyData* cfd = + column_family_set_->GetColumnFamily(edit.column_family_); + // If we cannot find this column family in our column family set, then it + // may be a new column family created by the primary after the secondary + // starts. Ignore it for now. + if (nullptr == cfd) { + continue; + } + if (active_version_builders_.find(edit.column_family_) == + active_version_builders_.end()) { + std::unique_ptr builder_guard( + new BaseReferencedVersionBuilder(cfd)); + active_version_builders_.insert( + std::make_pair(edit.column_family_, std::move(builder_guard))); + } + s = ApplyOneVersionEditToBuilder( + edit, &have_log_number, &log_number, &have_prev_log_number, + &previous_log_number, &have_next_file, &next_file, + &have_last_sequence, &last_sequence, &min_log_number_to_keep, + &max_column_family); + if (!s.ok()) { + break; + } + auto builder_iter = active_version_builders_.find(edit.column_family_); + assert(builder_iter != active_version_builders_.end()); + auto builder = builder_iter->second->version_builder(); + assert(builder != nullptr); + s = builder->LoadTableHandlers( + cfd->internal_stats(), db_options_->max_file_opening_threads, + false /* prefetch_index_and_filter_in_cache */, + false /* is_initial_load */, + cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); + TEST_SYNC_POINT_CALLBACK( + "ReactiveVersionSet::ReadAndApply:AfterLoadTableHandlers", &s); + if (!s.ok() && !s.IsPathNotFound()) { + break; + } else if (s.IsPathNotFound()) { + s = Status::OK(); + } else { // s.ok() == true + auto version = new Version(cfd, this, env_options_, + *cfd->GetLatestMutableCFOptions(), + current_version_number_++); + builder->SaveTo(version->storage_info()); + version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true); + AppendVersion(cfd, version); + active_version_builders_.erase(builder_iter); + if (cfds_changed->count(cfd) == 0) { + cfds_changed->insert(cfd); + } + } + if (have_next_file) { + next_file_number_.store(next_file + 1); + } + if (have_last_sequence) { + last_allocated_sequence_ = last_sequence; + last_published_sequence_ = last_sequence; + last_sequence_ = last_sequence; + } + if (have_prev_log_number) { + prev_log_number_ = previous_log_number; + MarkFileNumberUsed(previous_log_number); + } + if (have_log_number) { + MarkFileNumberUsed(log_number); + } + column_family_set_->UpdateMaxColumnFamily(max_column_family); + MarkMinLogNumberToKeep2PC(min_log_number_to_keep); + } + // It's possible that: + // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted. + // 2) we have finished reading the current MANIFEST. + // 3) we have encountered an IOError reading the current MANIFEST. + // We need to look for the next MANIFEST and start from there. If we cannot + // find the next MANIFEST, we should exit the loop. + s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader); + reader = manifest_reader->get(); + if (s.ok() && reader->file()->file_name() == old_manifest_path) { + break; + } + } + + if (s.ok()) { + for (auto cfd : *column_family_set_) { + auto builder_iter = active_version_builders_.find(cfd->GetID()); + if (builder_iter == active_version_builders_.end()) { + continue; + } + auto builder = builder_iter->second->version_builder(); + if (!builder->CheckConsistencyForNumLevels()) { + s = Status::InvalidArgument( + "db has more levels than options.num_levels"); + break; + } + } + } + return s; +} + +Status ReactiveVersionSet::ApplyOneVersionEditToBuilder( + VersionEdit& edit, bool* have_log_number, uint64_t* log_number, + bool* have_prev_log_number, uint64_t* previous_log_number, + bool* have_next_file, uint64_t* next_file, bool* have_last_sequence, + SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep, + uint32_t* max_column_family) { + ColumnFamilyData* cfd = nullptr; + Status status; + if (edit.is_column_family_add_) { + // TODO (yanqin) for now the secondary ignores column families created + // after Open. This also simplifies handling of switching to a new MANIFEST + // and processing the snapshot of the system at the beginning of the + // MANIFEST. + return Status::OK(); + } else if (edit.is_column_family_drop_) { + cfd = column_family_set_->GetColumnFamily(edit.column_family_); + // Drop a CF created by primary after secondary starts? Then ignore + if (cfd == nullptr) { + return Status::OK(); + } + // Drop the column family by setting it to be 'dropped' without destroying + // the column family handle. + cfd->SetDropped(); + if (cfd->Unref()) { + delete cfd; + cfd = nullptr; + } + } else { + cfd = column_family_set_->GetColumnFamily(edit.column_family_); + // Operation on a CF created after Open? Then ignore + if (cfd == nullptr) { + return Status::OK(); + } + auto builder_iter = active_version_builders_.find(edit.column_family_); + assert(builder_iter != active_version_builders_.end()); + auto builder = builder_iter->second->version_builder(); + assert(builder != nullptr); + builder->Apply(&edit); + } + return ExtractInfoFromVersionEdit( + cfd, edit, have_log_number, log_number, have_prev_log_number, + previous_log_number, have_next_file, next_file, have_last_sequence, + last_sequence, min_log_number_to_keep, max_column_family); +} + +Status ReactiveVersionSet::MaybeSwitchManifest( + log::Reader::Reporter* reporter, + std::unique_ptr* manifest_reader) { + assert(manifest_reader != nullptr); + Status s; + do { + std::string manifest_path; + s = GetCurrentManifestPath(&manifest_path); + std::unique_ptr manifest_file; + if (s.ok()) { + if (nullptr == manifest_reader->get() || + manifest_reader->get()->file()->file_name() != manifest_path) { + TEST_SYNC_POINT( + "ReactiveVersionSet::MaybeSwitchManifest:" + "AfterGetCurrentManifestPath:0"); + TEST_SYNC_POINT( + "ReactiveVersionSet::MaybeSwitchManifest:" + "AfterGetCurrentManifestPath:1"); + s = env_->NewSequentialFile( + manifest_path, &manifest_file, + env_->OptimizeForManifestRead(env_options_)); + } else { + // No need to switch manifest. + break; + } + } + std::unique_ptr manifest_file_reader; + if (s.ok()) { + manifest_file_reader.reset( + new SequentialFileReader(std::move(manifest_file), manifest_path)); + manifest_reader->reset(new log::FragmentBufferedReader( + nullptr, std::move(manifest_file_reader), reporter, + true /* checksum */, 0 /* log_number */)); + ROCKS_LOG_INFO(db_options_->info_log, "Switched to new manifest: %s\n", + manifest_path.c_str()); + // TODO (yanqin) every time we switch to a new MANIFEST, we clear the + // active_version_builders_ map because we choose to construct the + // versions from scratch, thanks to the first part of each MANIFEST + // written by VersionSet::WriteSnapshot. This is not necessary, but we + // choose this at present for the sake of simplicity. + active_version_builders_.clear(); + } + } while (s.IsPathNotFound()); + return s; +} + } // namespace rocksdb diff --git a/db/version_set.h b/db/version_set.h index b91278866..8b50dca76 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -648,6 +648,7 @@ class Version { private: Env* env_; + friend class ReactiveVersionSet; friend class VersionSet; const InternalKeyComparator* internal_comparator() const { @@ -739,9 +740,7 @@ struct ObsoleteFileInfo { } }; -namespace { class BaseReferencedVersionBuilder; -} class VersionSet { public: @@ -749,7 +748,7 @@ class VersionSet { const EnvOptions& env_options, Cache* table_cache, WriteBufferManager* write_buffer_manager, WriteController* write_controller); - ~VersionSet(); + virtual ~VersionSet(); // Apply *edit to the current version to form a new descriptor that // is both saved to persistent state and installed as the new @@ -795,7 +794,7 @@ class VersionSet { // The across-multi-cf batch version. If edit_lists contain more than // 1 version edits, caller must ensure that no edit in the []list is column // family manipulation. - Status LogAndApply( + virtual Status LogAndApply( const autovector& cfds, const autovector& mutable_cf_options_list, const autovector>& edit_lists, @@ -803,6 +802,8 @@ class VersionSet { bool new_descriptor_log = false, const ColumnFamilyOptions* new_cf_options = nullptr); + Status GetCurrentManifestPath(std::string* manifest_filename); + // Recover the last saved descriptor from persistent storage. // If read_only == true, Recover() will not complain if some column families // are not opened @@ -983,11 +984,12 @@ class VersionSet { static uint64_t GetTotalSstFilesSize(Version* dummy_versions); - private: + protected: struct ManifestWriter; friend class Version; friend class DBImpl; + friend class DBImplReadOnly; struct LogReporter : public log::Reader::Reporter { Status* status; @@ -1011,20 +1013,24 @@ class VersionSet { ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options, VersionEdit* edit); - Status ApplyOneVersionEdit( + // REQUIRES db mutex + Status ApplyOneVersionEditToBuilder( VersionEdit& edit, const std::unordered_map& name_to_opts, std::unordered_map& column_families_not_found, - std::unordered_map& builders, + std::unordered_map< + uint32_t, std::unique_ptr>& builders, bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number, uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file, bool* have_last_sequence, SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep, uint32_t* max_column_family); - Status ProcessManifestWrites(std::deque& writers, - InstrumentedMutex* mu, Directory* db_directory, - bool new_descriptor_log, - const ColumnFamilyOptions* new_cf_options); + Status ExtractInfoFromVersionEdit( + ColumnFamilyData* cfd, const VersionEdit& edit, bool* have_log_number, + uint64_t* log_number, bool* have_prev_log_number, + uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file, + bool* have_last_sequence, SequenceNumber* last_sequence, + uint64_t* min_log_number_to_keep, uint32_t* max_column_family); std::unique_ptr column_family_set_; @@ -1074,13 +1080,77 @@ class VersionSet { // env options for all reads and writes except compactions EnvOptions env_options_; + private: // No copying allowed VersionSet(const VersionSet&); void operator=(const VersionSet&); + // REQUIRES db mutex at beginning. may release and re-acquire db mutex + Status ProcessManifestWrites(std::deque& writers, + InstrumentedMutex* mu, Directory* db_directory, + bool new_descriptor_log, + const ColumnFamilyOptions* new_cf_options); + void LogAndApplyCFHelper(VersionEdit* edit); - void LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, Version* v, + void LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, VersionEdit* edit, InstrumentedMutex* mu); }; +class ReactiveVersionSet : public VersionSet { + public: + ReactiveVersionSet(const std::string& dbname, + const ImmutableDBOptions* _db_options, + const EnvOptions& _env_options, Cache* table_cache, + WriteBufferManager* write_buffer_manager, + WriteController* write_controller); + + ~ReactiveVersionSet() override; + + Status ReadAndApply( + InstrumentedMutex* mu, + std::unique_ptr* manifest_reader, + std::unordered_set* cfds_changed); + + Status Recover(const std::vector& column_families, + std::unique_ptr* manifest_reader, + std::unique_ptr* manifest_reporter, + std::unique_ptr* manifest_reader_status); + + protected: + using VersionSet::ApplyOneVersionEditToBuilder; + + // REQUIRES db mutex + Status ApplyOneVersionEditToBuilder( + VersionEdit& edit, bool* have_log_number, uint64_t* log_number, + bool* have_prev_log_number, uint64_t* previous_log_number, + bool* have_next_file, uint64_t* next_file, bool* have_last_sequence, + SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep, + uint32_t* max_column_family); + + Status MaybeSwitchManifest( + log::Reader::Reporter* reporter, + std::unique_ptr* manifest_reader); + + private: + std::unordered_map> + active_version_builders_; + + using VersionSet::LogAndApply; + using VersionSet::Recover; + + Status LogAndApply( + const autovector& /*cfds*/, + const autovector& /*mutable_cf_options_list*/, + const autovector>& /*edit_lists*/, + InstrumentedMutex* /*mu*/, Directory* /*db_directory*/, + bool /*new_descriptor_log*/, + const ColumnFamilyOptions* /*new_cf_option*/) override { + return Status::NotSupported("not supported in reactive mode"); + } + + // No copy allowed + ReactiveVersionSet(const ReactiveVersionSet&); + ReactiveVersionSet& operator=(const ReactiveVersionSet&); +}; + } // namespace rocksdb diff --git a/db/wal_manager.cc b/db/wal_manager.cc index 44676a77a..b306df710 100644 --- a/db/wal_manager.cc +++ b/db/wal_manager.cc @@ -457,7 +457,7 @@ Status WalManager::ReadFirstLine(const std::string& fname, reporter.status = &status; reporter.ignore_error = !db_options_.paranoid_checks; log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter, - true /*checksum*/, number, false /* retry_after_eof */); + true /*checksum*/, number); std::string scratch; Slice record; diff --git a/env/env_hdfs.cc b/env/env_hdfs.cc index 14fb902f0..7c0e14fe2 100644 --- a/env/env_hdfs.cc +++ b/env/env_hdfs.cc @@ -36,9 +36,11 @@ namespace { // Log error message static Status IOError(const std::string& context, int err_number) { - return (err_number == ENOSPC) ? - Status::NoSpace(context, strerror(err_number)) : - Status::IOError(context, strerror(err_number)); + return (err_number == ENOSPC) + ? Status::NoSpace(context, strerror(err_number)) + : (err_number == ENOENT) + ? Status::PathNotFound(context, strerror(err_number)) + : Status::IOError(context, strerror(err_number)); } // assume that there is one global logger for now. It is not thread-safe, diff --git a/env/io_posix.h b/env/io_posix.h index 106f6df65..e6824d3e8 100644 --- a/env/io_posix.h +++ b/env/io_posix.h @@ -41,6 +41,9 @@ static Status IOError(const std::string& context, const std::string& file_name, strerror(err_number)); case ESTALE: return Status::IOError(Status::kStaleFile); + case ENOENT: + return Status::PathNotFound(IOErrorMsg(context, file_name), + strerror(err_number)); default: return Status::IOError(IOErrorMsg(context, file_name), strerror(err_number)); diff --git a/examples/.gitignore b/examples/.gitignore index b5a05e44a..823664ae1 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -2,6 +2,7 @@ c_simple_example column_families_example compact_files_example compaction_filter_example +multi_processes_example optimistic_transaction_example options_file_example simple_example diff --git a/examples/Makefile b/examples/Makefile index 57cd1a75a..27a6f0f42 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -43,8 +43,11 @@ transaction_example: librocksdb transaction_example.cc options_file_example: librocksdb options_file_example.cc $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) +multi_processes_example: librocksdb multi_processes_example.cc + $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) + clean: - rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example ./options_file_example + rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example librocksdb: cd .. && $(MAKE) static_lib diff --git a/examples/multi_processes_example.cc b/examples/multi_processes_example.cc new file mode 100644 index 000000000..b1c1d02ba --- /dev/null +++ b/examples/multi_processes_example.cc @@ -0,0 +1,395 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +// How to use this example +// Open two terminals, in one of them, run `./multi_processes_example 0` to +// start a process running the primary instance. This will create a new DB in +// kDBPath. The process will run for a while inserting keys to the normal +// RocksDB database. +// Next, go to the other terminal and run `./multi_processes_example 1` to +// start a process running the secondary instance. This will create a secondary +// instance following the aforementioned primary instance. This process will +// run for a while, tailing the logs of the primary. After process with primary +// instance exits, this process will keep running until you hit 'CTRL+C'. + +#include +#include +#include +#include +#include +#include +#include +#include + +#if defined(OS_LINUX) +#include +#include +#include +#include +#include +#include +#endif // !OS_LINUX + +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" + +using rocksdb::ColumnFamilyDescriptor; +using rocksdb::ColumnFamilyHandle; +using rocksdb::ColumnFamilyOptions; +using rocksdb::DB; +using rocksdb::FlushOptions; +using rocksdb::Iterator; +using rocksdb::Options; +using rocksdb::ReadOptions; +using rocksdb::Slice; +using rocksdb::Status; +using rocksdb::WriteOptions; + +const std::string kDBPath = "/tmp/rocksdb_multi_processes_example"; +const std::string kPrimaryStatusFile = + "/tmp/rocksdb_multi_processes_example_primary_status"; +const uint64_t kMaxKey = 600000; +const size_t kMaxValueLength = 256; +const size_t kNumKeysPerFlush = 1000; + +const std::vector& GetColumnFamilyNames() { + static std::vector column_family_names = { + rocksdb::kDefaultColumnFamilyName, "pikachu"}; + return column_family_names; +} + +inline bool IsLittleEndian() { + uint32_t x = 1; + return *reinterpret_cast(&x) != 0; +} + +static std::atomic& ShouldSecondaryWait() { + static std::atomic should_secondary_wait{1}; + return should_secondary_wait; +} + +static std::string Key(uint64_t k) { + std::string ret; + if (IsLittleEndian()) { + ret.append(reinterpret_cast(&k), sizeof(k)); + } else { + char buf[sizeof(k)]; + buf[0] = k & 0xff; + buf[1] = (k >> 8) & 0xff; + buf[2] = (k >> 16) & 0xff; + buf[3] = (k >> 24) & 0xff; + buf[4] = (k >> 32) & 0xff; + buf[5] = (k >> 40) & 0xff; + buf[6] = (k >> 48) & 0xff; + buf[7] = (k >> 56) & 0xff; + ret.append(buf, sizeof(k)); + } + size_t i = 0, j = ret.size() - 1; + while (i < j) { + char tmp = ret[i]; + ret[i] = ret[j]; + ret[j] = tmp; + ++i; + --j; + } + return ret; +} + +static uint64_t Key(std::string key) { + assert(key.size() == sizeof(uint64_t)); + size_t i = 0, j = key.size() - 1; + while (i < j) { + char tmp = key[i]; + key[i] = key[j]; + key[j] = tmp; + ++i; + --j; + } + uint64_t ret = 0; + if (IsLittleEndian()) { + memcpy(&ret, key.c_str(), sizeof(uint64_t)); + } else { + const char* buf = key.c_str(); + ret |= static_cast(buf[0]); + ret |= (static_cast(buf[1]) << 8); + ret |= (static_cast(buf[2]) << 16); + ret |= (static_cast(buf[3]) << 24); + ret |= (static_cast(buf[4]) << 32); + ret |= (static_cast(buf[5]) << 40); + ret |= (static_cast(buf[6]) << 48); + ret |= (static_cast(buf[7]) << 56); + } + return ret; +} + +static Slice GenerateRandomValue(const size_t max_length, char scratch[]) { + size_t sz = 1 + (std::rand() % max_length); + int rnd = std::rand(); + for (size_t i = 0; i != sz; ++i) { + scratch[i] = static_cast(rnd ^ i); + } + return Slice(scratch, sz); +} + +static bool ShouldCloseDB() { return true; } + +// TODO: port this example to other systems. It should be straightforward for +// POSIX-compliant systems. +#if defined(OS_LINUX) +void CreateDB() { + long my_pid = static_cast(getpid()); + Options options; + Status s = rocksdb::DestroyDB(kDBPath, options); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to destroy DB: %s\n", my_pid, + s.ToString().c_str()); + assert(false); + } + options.create_if_missing = true; + DB* db = nullptr; + s = DB::Open(options, kDBPath, &db); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid, + s.ToString().c_str()); + assert(false); + } + std::vector handles; + ColumnFamilyOptions cf_opts(options); + for (const auto& cf_name : GetColumnFamilyNames()) { + if (rocksdb::kDefaultColumnFamilyName != cf_name) { + ColumnFamilyHandle* handle = nullptr; + s = db->CreateColumnFamily(cf_opts, cf_name, &handle); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to create CF %s: %s\n", my_pid, + cf_name.c_str(), s.ToString().c_str()); + assert(false); + } + handles.push_back(handle); + } + } + fprintf(stdout, "[process %ld] Column families created\n", my_pid); + for (auto h : handles) { + delete h; + } + handles.clear(); + delete db; +} + +void RunPrimary() { + long my_pid = static_cast(getpid()); + fprintf(stdout, "[process %ld] Primary instance starts\n", my_pid); + CreateDB(); + std::srand(time(nullptr)); + DB* db = nullptr; + Options options; + options.create_if_missing = false; + std::vector column_families; + for (const auto& cf_name : GetColumnFamilyNames()) { + column_families.push_back(ColumnFamilyDescriptor(cf_name, options)); + } + std::vector handles; + WriteOptions write_opts; + char val_buf[kMaxValueLength] = {0}; + uint64_t curr_key = 0; + while (curr_key < kMaxKey) { + Status s; + if (nullptr == db) { + s = DB::Open(options, kDBPath, column_families, &handles, &db); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid, + s.ToString().c_str()); + assert(false); + } + } + assert(nullptr != db); + assert(handles.size() == GetColumnFamilyNames().size()); + for (auto h : handles) { + assert(nullptr != h); + for (size_t i = 0; i != kNumKeysPerFlush; ++i) { + Slice key = Key(curr_key + static_cast(i)); + Slice value = GenerateRandomValue(kMaxValueLength, val_buf); + s = db->Put(write_opts, h, key, value); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to insert\n", my_pid); + assert(false); + } + } + s = db->Flush(FlushOptions(), h); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to flush\n", my_pid); + assert(false); + } + } + curr_key += static_cast(kNumKeysPerFlush); + if (ShouldCloseDB()) { + for (auto h : handles) { + delete h; + } + handles.clear(); + delete db; + db = nullptr; + } + } + if (nullptr != db) { + for (auto h : handles) { + delete h; + } + handles.clear(); + delete db; + db = nullptr; + } + fprintf(stdout, "[process %ld] Finished adding keys\n", my_pid); +} + +void secondary_instance_sigint_handler(int signal) { + ShouldSecondaryWait().store(0, std::memory_order_relaxed); + fprintf(stdout, "\n"); + fflush(stdout); +}; + +void RunSecondary() { + ::signal(SIGINT, secondary_instance_sigint_handler); + long my_pid = static_cast(getpid()); + const std::string kSecondaryPath = + "/tmp/rocksdb_multi_processes_example_secondary"; + // Create directory if necessary + if (nullptr == opendir(kSecondaryPath.c_str())) { + int ret = + mkdir(kSecondaryPath.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + if (ret < 0) { + perror("failed to create directory for secondary instance"); + exit(0); + } + } + DB* db = nullptr; + Options options; + options.create_if_missing = false; + options.max_open_files = -1; + Status s = DB::OpenAsSecondary(options, kDBPath, kSecondaryPath, &db); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to open in secondary mode: %s\n", + my_pid, s.ToString().c_str()); + assert(false); + } else { + fprintf(stdout, "[process %ld] Secondary instance starts\n", my_pid); + } + + ReadOptions ropts; + ropts.verify_checksums = true; + ropts.total_order_seek = true; + + std::vector test_threads; + test_threads.emplace_back([&]() { + while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) { + std::unique_ptr iter(db->NewIterator(ropts)); + iter->SeekToFirst(); + size_t count = 0; + for (; iter->Valid(); iter->Next()) { + ++count; + } + } + fprintf(stdout, "[process %ld] Range_scan thread finished\n", my_pid); + }); + + test_threads.emplace_back([&]() { + std::srand(time(nullptr)); + while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) { + Slice key = Key(std::rand() % kMaxKey); + std::string value; + db->Get(ropts, key, &value); + } + fprintf(stdout, "[process %ld] Point lookup thread finished\n"); + }); + + uint64_t curr_key = 0; + while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) { + s = db->TryCatchUpWithPrimary(); + if (!s.ok()) { + fprintf(stderr, + "[process %ld] error while trying to catch up with " + "primary %s\n", + my_pid, s.ToString().c_str()); + assert(false); + } + { + std::unique_ptr iter(db->NewIterator(ropts)); + if (!iter) { + fprintf(stderr, "[process %ld] Failed to create iterator\n", my_pid); + assert(false); + } + iter->SeekToLast(); + if (iter->Valid()) { + uint64_t curr_max_key = Key(iter->key().ToString()); + if (curr_max_key != curr_key) { + fprintf(stdout, "[process %ld] Observed key %" PRIu64 "\n", my_pid, + curr_key); + curr_key = curr_max_key; + } + } + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + s = db->TryCatchUpWithPrimary(); + if (!s.ok()) { + fprintf(stderr, + "[process %ld] error while trying to catch up with " + "primary %s\n", + my_pid, s.ToString().c_str()); + assert(false); + } + + std::vector column_families; + for (const auto& cf_name : GetColumnFamilyNames()) { + column_families.push_back(ColumnFamilyDescriptor(cf_name, options)); + } + std::vector handles; + DB* verification_db = nullptr; + s = DB::OpenForReadOnly(options, kDBPath, column_families, &handles, + &verification_db); + assert(s.ok()); + Iterator* iter1 = verification_db->NewIterator(ropts); + iter1->SeekToFirst(); + + Iterator* iter = db->NewIterator(ropts); + iter->SeekToFirst(); + for (; iter->Valid() && iter1->Valid(); iter->Next(), iter1->Next()) { + if (iter->key().ToString() != iter1->key().ToString()) { + fprintf(stderr, "%" PRIu64 "!= %" PRIu64 "\n", + Key(iter->key().ToString()), Key(iter1->key().ToString())); + assert(false); + } else if (iter->value().ToString() != iter1->value().ToString()) { + fprintf(stderr, "Value mismatch\n"); + assert(false); + } + } + fprintf(stdout, "[process %ld] Verification succeeded\n", my_pid); + for (auto& thr : test_threads) { + thr.join(); + } + delete iter; + delete iter1; + delete db; + delete verification_db; +} + +int main(int argc, char** argv) { + if (argc < 2) { + fprintf(stderr, "%s <0 for primary, 1 for secondary>\n", argv[0]); + return 0; + } + if (atoi(argv[1]) == 0) { + RunPrimary(); + } else { + RunSecondary(); + } + return 0; +} +#else // OS_LINUX +int main() { + fpritnf(stderr, "Not implemented.\n"); + return 0; +} +#endif // !OS_LINUX diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 9d5316546..b9e727479 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -162,6 +162,54 @@ class DB { std::vector* handles, DB** dbptr, bool error_if_log_file_exist = false); + // The following OpenAsSecondary functions create a secondary instance that + // can dynamically tail the MANIFEST of a primary that must have already been + // created. User can call TryCatchUpWithPrimary to make the secondary + // instance catch up with primary (WAL tailing is NOT supported now) whenever + // the user feels necessary. Column families created by the primary after the + // secondary instance starts are currently ignored by the secondary instance. + // Column families opened by secondary and dropped by the primary will be + // dropped by secondary as well. However the user of the secondary instance + // can still access the data of such dropped column family as long as they + // do not destroy the corresponding column family handle. + // WAL tailing is not supported at present, but will arrive soon. + // + // The options argument specifies the options to open the secondary instance. + // The name argument specifies the name of the primary db that you have used + // to open the primary instance. + // The secondary_path argument points to a directory where the secondary + // instance stores its info log. + // The dbptr is an out-arg corresponding to the opened secondary instance. + // The pointer points to a heap-allocated database, and the user should + // delete it after use. + // Open DB as secondary instance with only the default column family. + // Return OK on success, non-OK on failures. + static Status OpenAsSecondary(const Options& options, const std::string& name, + const std::string& secondary_path, DB** dbptr); + + // Open DB as secondary instance with column families. You can open a subset + // of column families in secondary mode. + // The db_options specify the database specific options. + // The name argument specifies the name of the primary db that you have used + // to open the primary instance. + // The secondary_path argument points to a directory where the secondary + // instance stores its info log. + // The column_families argument specifieds a list of column families to open. + // If any of the column families does not exist, the function returns non-OK + // status. + // The handles is an out-arg corresponding to the opened database column + // familiy handles. + // The dbptr is an out-arg corresponding to the opened secondary instance. + // The pointer points to a heap-allocated database, and the caller should + // delete it after use. Before deleting the dbptr, the user should also + // delete the pointers stored in handles vector. + // Return OK on success, on-OK on failures. + static Status OpenAsSecondary( + const DBOptions& db_options, const std::string& name, + const std::string& secondary_path, + const std::vector& column_families, + std::vector* handles, DB** dbptr); + // Open DB with column families. // db_options specify database specific options // column_families is the vector of all column families in the database, @@ -1235,6 +1283,23 @@ class DB { return Status::NotSupported("GetStatsHistory() is not implemented."); } +#ifndef ROCKSDB_LITE + // Make the secondary instance catch up with the primary by tailing and + // replaying the MANIFEST and WAL of the primary. + // Column families created by the primary after the secondary instance starts + // will be ignored unless the secondary instance closes and restarts with the + // newly created column families. + // Column families that exist before secondary instance starts and dropped by + // the primary afterwards will be marked as dropped. However, as long as the + // secondary instance does not delete the corresponding column family + // handles, the data of the column family is still accessible to the + // secondary. + // TODO: we will support WAL tailing soon. + virtual Status TryCatchUpWithPrimary() { + return Status::NotSupported("Supported only by secondary instance"); + } +#endif // !ROCKSDB_LITE + private: // No copying allowed DB(const DB&); diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index 40b374ecf..f8f66bf42 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -73,6 +73,7 @@ class Status { kStaleFile = 6, kMemoryLimit = 7, kSpaceLimit = 8, + kPathNotFound = 9, kMaxSubCode }; @@ -198,6 +199,11 @@ class Status { return Status(kIOError, kSpaceLimit, msg, msg2); } + static Status PathNotFound() { return Status(kIOError, kPathNotFound); } + static Status PathNotFound(const Slice& msg, const Slice& msg2 = Slice()) { + return Status(kIOError, kPathNotFound, msg, msg2); + } + // Returns true iff the status indicates success. bool ok() const { return code() == kOk; } @@ -266,6 +272,14 @@ class Status { return (code() == kAborted) && (subcode() == kMemoryLimit); } + // Returns true iff the status indicates a PathNotFound error + // This is caused by an I/O error returning the specific "no such file or + // directory" error condition. A PathNotFound error is an I/O error with + // a specific subcode, enabling users to take appropriate action if necessary + bool IsPathNotFound() const { + return (code() == kIOError) && (subcode() == kPathNotFound); + } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; diff --git a/port/win/io_win.h b/port/win/io_win.h index c46876b8c..1c9d803b1 100644 --- a/port/win/io_win.h +++ b/port/win/io_win.h @@ -27,7 +27,9 @@ std::string GetWindowsErrSz(DWORD err); inline Status IOErrorFromWindowsError(const std::string& context, DWORD err) { return ((err == ERROR_HANDLE_DISK_FULL) || (err == ERROR_DISK_FULL)) ? Status::NoSpace(context, GetWindowsErrSz(err)) - : Status::IOError(context, GetWindowsErrSz(err)); + : ((err == ERROR_FILE_NOT_FOUND) || (err == ERROR_PATH_NOT_FOUND)) + ? Status::PathNotFound(context, GetWindowsErrSz(err)) + : Status::IOError(context, GetWindowsErrSz(err)); } inline Status IOErrorFromLastWindowsError(const std::string& context) { @@ -37,7 +39,9 @@ inline Status IOErrorFromLastWindowsError(const std::string& context) { inline Status IOError(const std::string& context, int err_number) { return (err_number == ENOSPC) ? Status::NoSpace(context, strerror(err_number)) - : Status::IOError(context, strerror(err_number)); + : (err_number == ENOENT) + ? Status::PathNotFound(context, strerror(err_number)) + : Status::IOError(context, strerror(err_number)); } class WinFileData; @@ -426,9 +430,7 @@ public: class WinDirectory : public Directory { HANDLE handle_; public: - explicit - WinDirectory(HANDLE h) noexcept : - handle_(h) { + explicit WinDirectory(HANDLE h) noexcept : handle_(h) { assert(handle_ != INVALID_HANDLE_VALUE); } ~WinDirectory() { diff --git a/src.mk b/src.mk index 728332905..55b4e3427 100644 --- a/src.mk +++ b/src.mk @@ -22,6 +22,7 @@ LIB_SOURCES = \ db/db_impl_files.cc \ db/db_impl_open.cc \ db/db_impl_readonly.cc \ + db/db_impl_secondary.cc \ db/db_impl_write.cc \ db/db_info_dumper.cc \ db/db_iter.cc \ @@ -279,6 +280,7 @@ MAIN_SOURCES = \ db/db_options_test.cc \ db/db_properties_test.cc \ db/db_range_del_test.cc \ + db/db_secondary_test.cc \ db/db_sst_test.cc \ db/db_statistics_test.cc \ db/db_table_properties_test.cc \ diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 82cb763e6..9bf5f4629 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -2014,8 +2014,7 @@ void DumpWalFile(Options options, std::string wal_file, bool print_header, log_number = 0; } log::Reader reader(options.info_log, std::move(wal_file_reader), &reporter, - true /* checksum */, log_number, - false /* retry_after_eof */); + true /* checksum */, log_number); std::string scratch; WriteBatch batch; Slice record; diff --git a/util/status.cc b/util/status.cc index 5b3dcf8e9..c66bf6f8e 100644 --- a/util/status.cc +++ b/util/status.cc @@ -41,7 +41,8 @@ static const char* msgs[static_cast(Status::kMaxSubCode)] = { "Deadlock", // kDeadlock "Stale file handle", // kStaleFile "Memory limit reached", // kMemoryLimit - "Space limit reached" // kSpaceLimit + "Space limit reached", // kSpaceLimit + "No such file or directory", // kPathNotFound }; Status::Status(Code _code, SubCode _subcode, const Slice& msg,