diff --git a/HISTORY.md b/HISTORY.md index 9bbcc226f..9672d0196 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ ### Bug Fixes * Use thread-safe `strerror_r()` to get error messages. +* Made BackupEngine thread-safe and added documentation comments to clarify what is safe for multiple BackupEngine objects accessing the same backup directory. ### Performance Improvements * On ARM platform, use `yield` instead of `wfe` to relax cpu to gain better performance. diff --git a/include/rocksdb/utilities/backupable_db.h b/include/rocksdb/utilities/backupable_db.h index 6f8a7cda6..fdbd27769 100644 --- a/include/rocksdb/utilities/backupable_db.h +++ b/include/rocksdb/utilities/backupable_db.h @@ -336,20 +336,12 @@ class BackupStatistics { uint32_t number_fail_backup; }; -// A backup engine for accessing information about backups and restoring from -// them. -// BackupEngineReadOnly is not extensible. -class BackupEngineReadOnly { +// Read-only functions of a BackupEngine. (Restore writes to another directory +// not the backup directory.) See BackupEngine comments for details on +// safe concurrent operations. +class BackupEngineReadOnlyBase { public: - virtual ~BackupEngineReadOnly() {} - - static Status Open(const BackupableDBOptions& options, Env* db_env, - BackupEngineReadOnly** backup_engine_ptr); - // keep for backward compatibility. - static Status Open(Env* db_env, const BackupableDBOptions& options, - BackupEngineReadOnly** backup_engine_ptr) { - return BackupEngineReadOnly::Open(options, db_env, backup_engine_ptr); - } + virtual ~BackupEngineReadOnlyBase() {} // Returns info about backups in backup_info // You can GetBackupInfo safely, even with other BackupEngine performing @@ -359,36 +351,36 @@ class BackupEngineReadOnly { virtual void GetBackupInfo(std::vector* backup_info, bool include_file_details = false) const = 0; - // Returns info about corrupt backups in corrupt_backups + // Returns info about corrupt backups in corrupt_backups. + // WARNING: Any write to the BackupEngine could trigger automatic + // GarbageCollect(), which could delete files that would be needed to + // manually recover a corrupt backup or to preserve an unrecognized (e.g. + // incompatible future version) backup. virtual void GetCorruptedBackups( std::vector* corrupt_backup_ids) const = 0; - // Restoring DB from backup is NOT safe when there is another BackupEngine - // running that might call DeleteBackup() or PurgeOldBackups(). It is caller's - // responsibility to synchronize the operation, i.e. don't delete the backup - // when you're restoring from it - // See also the corresponding doc in BackupEngine + // Restore to specified db_dir and wal_dir from backup_id. virtual Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id, const std::string& db_dir, - const std::string& wal_dir) = 0; + const std::string& wal_dir) const = 0; // keep for backward compatibility. virtual Status RestoreDBFromBackup( BackupID backup_id, const std::string& db_dir, const std::string& wal_dir, - const RestoreOptions& options = RestoreOptions()) { + const RestoreOptions& options = RestoreOptions()) const { return RestoreDBFromBackup(options, backup_id, db_dir, wal_dir); } - // See the corresponding doc in BackupEngine - virtual Status RestoreDBFromLatestBackup(const RestoreOptions& options, - const std::string& db_dir, - const std::string& wal_dir) = 0; + // Like RestoreDBFromBackup but restores from latest non-corrupt backup_id + virtual Status RestoreDBFromLatestBackup( + const RestoreOptions& options, const std::string& db_dir, + const std::string& wal_dir) const = 0; // keep for backward compatibility. virtual Status RestoreDBFromLatestBackup( const std::string& db_dir, const std::string& wal_dir, - const RestoreOptions& options = RestoreOptions()) { + const RestoreOptions& options = RestoreOptions()) const { return RestoreDBFromLatestBackup(options, db_dir, wal_dir); } @@ -408,25 +400,15 @@ class BackupEngineReadOnly { // // Returns Status::OK() if all checks are good virtual Status VerifyBackup(BackupID backup_id, - bool verify_with_checksum = false) = 0; + bool verify_with_checksum = false) const = 0; }; -// A backup engine for creating new backups. -// BackupEngine is not extensible. -class BackupEngine { +// Append-only functions of a BackupEngine. See BackupEngine comment for +// details on distinction between Append and Write operations and safe +// concurrent operations. +class BackupEngineAppendOnlyBase { public: - virtual ~BackupEngine() {} - - // BackupableDBOptions have to be the same as the ones used in previous - // BackupEngines for the same backup directory. - static Status Open(const BackupableDBOptions& options, Env* db_env, - BackupEngine** backup_engine_ptr); - - // keep for backward compatibility. - static Status Open(Env* db_env, const BackupableDBOptions& options, - BackupEngine** backup_engine_ptr) { - return BackupEngine::Open(options, db_env, backup_engine_ptr); - } + virtual ~BackupEngineAppendOnlyBase() {} // same as CreateNewBackup, but stores extra application metadata. virtual Status CreateNewBackupWithMetadata( @@ -443,8 +425,7 @@ class BackupEngine { return CreateNewBackupWithMetadata(options, db, app_metadata); } - // Captures the state of the database in the latest backup - // NOT a thread safe call + // Captures the state of the database by creating a new (latest) backup virtual Status CreateNewBackup(const CreateBackupOptions& options, DB* db) { return CreateNewBackupWithMetadata(options, db, ""); } @@ -459,16 +440,6 @@ class BackupEngine { return CreateNewBackup(options, db); } - // Deletes old backups, keeping latest num_backups_to_keep alive. - // See also DeleteBackup. - virtual Status PurgeOldBackups(uint32_t num_backups_to_keep) = 0; - - // Deletes a specific backup. If this operation (or PurgeOldBackups) - // is not completed due to crash, power failure, etc. the state - // will be cleaned up the next time you call DeleteBackup, - // PurgeOldBackups, or GarbageCollect. - virtual Status DeleteBackup(BackupID backup_id) = 0; - // Call this from another thread if you want to stop the backup // that is currently happening. It will return immediately, will // not wait for the backup to stop. @@ -478,69 +449,115 @@ class BackupEngine { // next time you call CreateNewBackup or GarbageCollect. virtual void StopBackup() = 0; - // Returns info about backups in backup_info - virtual void GetBackupInfo(std::vector* backup_info, - bool include_file_details = false) const = 0; + // Will delete any files left over from incomplete creation or deletion of + // a backup. This is not normally needed as those operations also clean up + // after prior incomplete calls to the same kind of operation (create or + // delete). This does not delete corrupt backups but can delete files that + // would be needed to manually recover a corrupt backup or to preserve an + // unrecognized (e.g. incompatible future version) backup. + // NOTE: This is not designed to delete arbitrary files added to the backup + // directory outside of BackupEngine, and clean-up is always subject to + // permissions on and availability of the underlying filesystem. + // NOTE2: For concurrency and interference purposes (see BackupEngine + // comment), GarbageCollect (GC) is like other Append operations, even + // though it seems different. Although GC can delete physical data, it does + // not delete any logical data read by Read operations. GC can interfere + // with Append or Write operations in another BackupEngine on the same + // backup_dir, because temporary files will be treated as obsolete and + // deleted. + virtual Status GarbageCollect() = 0; +}; - // Returns info about corrupt backups in corrupt_backups - virtual void GetCorruptedBackups( - std::vector* corrupt_backup_ids) const = 0; +// A backup engine for organizing and managing backups. +// This class is not user-extensible. +// +// This class declaration adds "Write" operations in addition to the +// operations from BackupEngineAppendOnlyBase and BackupEngineReadOnlyBase. +// +// # Concurrency between threads on the same BackupEngine* object +// +// As of version 6.20, BackupEngine* operations are generally thread-safe, +// using a read-write lock, though single-thread operation is still +// recommended to avoid TOCTOU bugs. Specifically, particular kinds of +// concurrent operations behave like this: +// +// op1\op2| Read | Append | Write +// -------|-------|--------|-------- +// Read | conc | block | block +// Append | block | block | block +// Write | block | block | block +// +// conc = operations safely proceed concurrently +// block = one of the operations safely blocks until the other completes. +// There is generally no guarantee as to which completes first. +// +// StopBackup is the only operation that affects an ongoing operation. +// +// # Interleaving operations between BackupEngine* objects open on the +// same backup_dir +// +// It is recommended only to have one BackupEngine* object open for a given +// backup_dir, but it is possible to mix / interleave some operations +// (regardless of whether they are concurrent) with these caveats: +// +// op1\op2| Open | Read | Append | Write +// -------|--------|--------|--------|-------- +// Open | conc | conc | atomic | unspec +// Read | conc | conc | old | unspec +// Append | atomic | old | unspec | unspec +// Write | unspec | unspec | unspec | unspec +// +// Special case: Open with destroy_old_data=true is really a Write +// +// conc = operations safely proceed, concurrently when applicable +// atomic = operations are effectively atomic; if a concurrent Append +// operation has not completed at some key point during Open, the +// opened BackupEngine* will never see the result of the Append op. +// old = Read operations do not include any state changes from other +// BackupEngine* objects; they return the state at their Open time. +// unspec = Behavior is unspecified, including possibly trashing the +// backup_dir, but is "memory safe" (no C++ undefined behavior) +// +class BackupEngine : public BackupEngineReadOnlyBase, + public BackupEngineAppendOnlyBase { + public: + virtual ~BackupEngine() {} - // restore from backup with backup_id - // IMPORTANT -- if options_.share_table_files == true, - // options_.share_files_with_checksum == false, you restore DB from some - // backup that is not the latest, and you start creating new backups from the - // new DB, they will probably fail. - // - // Example: Let's say you have backups 1, 2, 3, 4, 5 and you restore 3. - // If you add new data to the DB and try creating a new backup now, the - // database will diverge from backups 4 and 5 and the new backup will fail. - // If you want to create new backup, you will first have to delete backups 4 - // and 5. - virtual Status RestoreDBFromBackup(const RestoreOptions& options, - BackupID backup_id, - const std::string& db_dir, - const std::string& wal_dir) = 0; + // BackupableDBOptions have to be the same as the ones used in previous + // BackupEngines for the same backup directory. + static Status Open(const BackupableDBOptions& options, Env* db_env, + BackupEngine** backup_engine_ptr); // keep for backward compatibility. - virtual Status RestoreDBFromBackup( - BackupID backup_id, const std::string& db_dir, const std::string& wal_dir, - const RestoreOptions& options = RestoreOptions()) { - return RestoreDBFromBackup(options, backup_id, db_dir, wal_dir); + static Status Open(Env* db_env, const BackupableDBOptions& options, + BackupEngine** backup_engine_ptr) { + return BackupEngine::Open(options, db_env, backup_engine_ptr); } - // restore from the latest backup - virtual Status RestoreDBFromLatestBackup(const RestoreOptions& options, - const std::string& db_dir, - const std::string& wal_dir) = 0; + // Deletes old backups, keeping latest num_backups_to_keep alive. + // See also DeleteBackup. + virtual Status PurgeOldBackups(uint32_t num_backups_to_keep) = 0; - // keep for backward compatibility. - virtual Status RestoreDBFromLatestBackup( - const std::string& db_dir, const std::string& wal_dir, - const RestoreOptions& options = RestoreOptions()) { - return RestoreDBFromLatestBackup(options, db_dir, wal_dir); - } + // Deletes a specific backup. If this operation (or PurgeOldBackups) + // is not completed due to crash, power failure, etc. the state + // will be cleaned up the next time you call DeleteBackup, + // PurgeOldBackups, or GarbageCollect. + virtual Status DeleteBackup(BackupID backup_id) = 0; +}; - // If verify_with_checksum is true, this function - // inspects the current checksums and file sizes of backup files to see if - // they match our expectation. - // - // If verify_with_checksum is false, this function - // checks that each file exists and that the size of the file matches our - // expectation. It does not check file checksum. - // - // Returns Status::OK() if all checks are good - virtual Status VerifyBackup(BackupID backup_id, - bool verify_with_checksum = false) = 0; +// A variant of BackupEngine that only allows "Read" operations. See +// BackupEngine comment for details. This class is not user-extensible. +class BackupEngineReadOnly : public BackupEngineReadOnlyBase { + public: + virtual ~BackupEngineReadOnly() {} - // Will delete any files left over from incomplete creation or deletion of - // a backup. This is not normally needed as those operations also clean up - // after prior incomplete calls to the same kind of operation (create or - // delete). - // NOTE: This is not designed to delete arbitrary files added to the backup - // directory outside of BackupEngine, and clean-up is always subject to - // permissions on and availability of the underlying filesystem. - virtual Status GarbageCollect() = 0; + static Status Open(const BackupableDBOptions& options, Env* db_env, + BackupEngineReadOnly** backup_engine_ptr); + // keep for backward compatibility. + static Status Open(Env* db_env, const BackupableDBOptions& options, + BackupEngineReadOnly** backup_engine_ptr) { + return BackupEngineReadOnly::Open(options, db_env, backup_engine_ptr); + } }; } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index e53e978e9..325d76e6e 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -50,6 +50,8 @@ namespace ROCKSDB_NAMESPACE { namespace { using ShareFilesNaming = BackupableDBOptions::ShareFilesNaming; +constexpr BackupID kLatestBackupIDMarker = static_cast(-2); + inline uint32_t ChecksumHexToInt32(const std::string& checksum_hex) { std::string checksum_str; Slice(checksum_hex).DecodeHex(&checksum_str); @@ -108,49 +110,43 @@ void BackupableDBOptions::Dump(Logger* logger) const { } // -------- BackupEngineImpl class --------- -class BackupEngineImpl : public BackupEngine { +class BackupEngineImpl { public: BackupEngineImpl(const BackupableDBOptions& options, Env* db_env, bool read_only = false); - ~BackupEngineImpl() override; + ~BackupEngineImpl(); - using BackupEngine::CreateNewBackupWithMetadata; Status CreateNewBackupWithMetadata(const CreateBackupOptions& options, DB* db, - const std::string& app_metadata) override; + const std::string& app_metadata); - Status PurgeOldBackups(uint32_t num_backups_to_keep) override; + Status PurgeOldBackups(uint32_t num_backups_to_keep); - Status DeleteBackup(BackupID backup_id) override; + Status DeleteBackup(BackupID backup_id); - void StopBackup() override { - stop_backup_.store(true, std::memory_order_release); - } + void StopBackup() { stop_backup_.store(true, std::memory_order_release); } - Status GarbageCollect() override; + Status GarbageCollect(); // The returned BackupInfos are in chronological order, which means the // latest backup comes last. void GetBackupInfo(std::vector* backup_info, - bool include_file_details) const override; + bool include_file_details) const; - void GetCorruptedBackups( - std::vector* corrupt_backup_ids) const override; + void GetCorruptedBackups(std::vector* corrupt_backup_ids) const; - using BackupEngine::RestoreDBFromBackup; Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id, const std::string& db_dir, - const std::string& wal_dir) override; + const std::string& wal_dir) const; - using BackupEngine::RestoreDBFromLatestBackup; Status RestoreDBFromLatestBackup(const RestoreOptions& options, const std::string& db_dir, - const std::string& wal_dir) override { - return RestoreDBFromBackup(options, latest_valid_backup_id_, db_dir, - wal_dir); + const std::string& wal_dir) const { + // Note: don't read latest_valid_backup_id_ outside of lock + return RestoreDBFromBackup(options, kLatestBackupIDMarker, db_dir, wal_dir); } Status VerifyBackup(BackupID backup_id, - bool verify_with_checksum = false) override; + bool verify_with_checksum = false) const; Status Initialize(); @@ -164,14 +160,15 @@ class BackupEngineImpl : public BackupEngine { } private: - void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0); - Status DeleteBackupInternal(BackupID backup_id); + void DeleteChildren(const std::string& dir, + uint32_t file_type_filter = 0) const; + Status DeleteBackupNoGC(BackupID backup_id); // Extends the "result" map with pathname->size mappings for the contents of // "dir" in "env". Pathnames are prefixed with "dir". - Status InsertPathnameToSizeBytes( + Status ReadChildFileCurrentSizes( const std::string& dir, Env* env, - std::unordered_map* result); + std::unordered_map* result) const; struct FileInfo { FileInfo(const std::string& fname, uint64_t sz, const std::string& checksum, @@ -219,7 +216,13 @@ class BackupEngineImpl : public BackupEngine { ~BackupMeta() {} - Status RecordTimestamp() { return env_->GetCurrentTime(×tamp_); } + void RecordTimestamp() { + // Best effort + Status s = env_->GetCurrentTime(×tamp_); + if (!s.ok()) { + timestamp_ = /* something clearly fabricated */ 1; + } + } int64_t GetTimestamp() const { return timestamp_; } @@ -384,7 +387,7 @@ class BackupEngineImpl : public BackupEngine { Status ReadFileAndComputeChecksum(const std::string& src, Env* src_env, const EnvOptions& src_env_options, uint64_t size_limit, - std::string* checksum_hex); + std::string* checksum_hex) const; // Obtain db_id and db_session_id from the table properties of file_path Status GetFileDbIdentities(Env* src_env, const EnvOptions& src_env_options, @@ -544,11 +547,18 @@ class BackupEngineImpl : public BackupEngine { struct RestoreAfterCopyOrCreateWorkItem { std::future result; + std::string from_file; + std::string to_file; std::string checksum_hex; RestoreAfterCopyOrCreateWorkItem() : checksum_hex("") {} RestoreAfterCopyOrCreateWorkItem(std::future&& _result, + const std::string& _from_file, + const std::string& _to_file, const std::string& _checksum_hex) - : result(std::move(_result)), checksum_hex(_checksum_hex) {} + : result(std::move(_result)), + from_file(_from_file), + to_file(_to_file), + checksum_hex(_checksum_hex) {} RestoreAfterCopyOrCreateWorkItem(RestoreAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT { *this = std::move(o); @@ -564,9 +574,10 @@ class BackupEngineImpl : public BackupEngine { bool initialized_; std::mutex byte_report_mutex_; - channel files_to_copy_or_create_; + mutable channel files_to_copy_or_create_; std::vector threads_; std::atomic threads_cpu_priority_; + // Certain operations like PurgeOldBackups and DeleteBackup will trigger // automatic GarbageCollect (true) unless we've already done one in this // session and have not failed to delete backup files since then (false). @@ -616,7 +627,7 @@ class BackupEngineImpl : public BackupEngine { std::unique_ptr private_directory_; static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB - size_t copy_file_buffer_size_; + mutable size_t copy_file_buffer_size_; bool read_only_; BackupStatistics backup_statistics_; std::unordered_set reported_ignored_fields_; @@ -626,10 +637,101 @@ class BackupEngineImpl : public BackupEngine { std::unique_ptr test_future_options_; }; +// -------- BackupEngineImplThreadSafe class --------- +// This locking layer for thread safety in the public API is layered on +// top to prevent accidental recursive locking with RWMutex, which is UB. +// Note: BackupEngineReadOnlyBase inherited twice, but has no fields +class BackupEngineImplThreadSafe : public BackupEngine, + public BackupEngineReadOnly { + public: + BackupEngineImplThreadSafe(const BackupableDBOptions& options, Env* db_env, + bool read_only = false) + : impl_(options, db_env, read_only) {} + ~BackupEngineImplThreadSafe() override {} + + using BackupEngine::CreateNewBackupWithMetadata; + Status CreateNewBackupWithMetadata(const CreateBackupOptions& options, DB* db, + const std::string& app_metadata) override { + WriteLock lock(&mutex_); + return impl_.CreateNewBackupWithMetadata(options, db, app_metadata); + } + + Status PurgeOldBackups(uint32_t num_backups_to_keep) override { + WriteLock lock(&mutex_); + return impl_.PurgeOldBackups(num_backups_to_keep); + } + + Status DeleteBackup(BackupID backup_id) override { + WriteLock lock(&mutex_); + return impl_.DeleteBackup(backup_id); + } + + void StopBackup() override { + // No locking needed + impl_.StopBackup(); + } + + Status GarbageCollect() override { + WriteLock lock(&mutex_); + return impl_.GarbageCollect(); + } + + void GetBackupInfo(std::vector* backup_info, + bool include_file_details) const override { + ReadLock lock(&mutex_); + impl_.GetBackupInfo(backup_info, include_file_details); + } + + void GetCorruptedBackups( + std::vector* corrupt_backup_ids) const override { + ReadLock lock(&mutex_); + impl_.GetCorruptedBackups(corrupt_backup_ids); + } + + using BackupEngine::RestoreDBFromBackup; + Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id, + const std::string& db_dir, + const std::string& wal_dir) const override { + ReadLock lock(&mutex_); + return impl_.RestoreDBFromBackup(options, backup_id, db_dir, wal_dir); + } + + using BackupEngine::RestoreDBFromLatestBackup; + Status RestoreDBFromLatestBackup(const RestoreOptions& options, + const std::string& db_dir, + const std::string& wal_dir) const override { + // Defer to above function, which locks + return RestoreDBFromBackup(options, kLatestBackupIDMarker, db_dir, wal_dir); + } + + Status VerifyBackup(BackupID backup_id, + bool verify_with_checksum = false) const override { + ReadLock lock(&mutex_); + return impl_.VerifyBackup(backup_id, verify_with_checksum); + } + + // Not public API but needed + Status Initialize() { + // No locking needed + return impl_.Initialize(); + } + + // Not public API but used in testing + void TEST_EnableWriteFutureSchemaVersion2( + const TEST_FutureSchemaVersion2Options& options) { + impl_.test_future_options_.reset( + new TEST_FutureSchemaVersion2Options(options)); + } + + private: + mutable port::RWMutex mutex_; + BackupEngineImpl impl_; +}; + Status BackupEngine::Open(const BackupableDBOptions& options, Env* env, BackupEngine** backup_engine_ptr) { - std::unique_ptr backup_engine( - new BackupEngineImpl(options, env)); + std::unique_ptr backup_engine( + new BackupEngineImplThreadSafe(options, env)); auto s = backup_engine->Initialize(); if (!s.ok()) { *backup_engine_ptr = nullptr; @@ -738,13 +840,11 @@ Status BackupEngineImpl::Initialize() { BackupID backup_id = 0; sscanf(file.c_str(), "%u", &backup_id); if (backup_id == 0 || file != ROCKSDB_NAMESPACE::ToString(backup_id)) { - if (!read_only_) { - // invalid file name, delete that - auto s = backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file); - ROCKS_LOG_INFO(options_.info_log, - "Unrecognized meta file %s, deleting -- %s", - file.c_str(), s.ToString().c_str()); - } + // Invalid file name, will be deleted with auto-GC when user + // initiates an append or write operation. (Behave as read-only until + // then.) + ROCKS_LOG_INFO(options_.info_log, "Skipping unrecognized meta file %s", + file.c_str()); continue; } assert(backups_.find(backup_id) == backups_.end()); @@ -783,7 +883,7 @@ Status BackupEngineImpl::Initialize() { {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) { const auto abs_dir = GetAbsolutePath(rel_dir); Status s = - InsertPathnameToSizeBytes(abs_dir, backup_env_, &abs_path_to_size); + ReadChildFileCurrentSizes(abs_dir, backup_env_, &abs_path_to_size); if (!s.ok()) { // I/O error likely impacting all backups return s; @@ -805,7 +905,7 @@ Status BackupEngineImpl::Initialize() { // Insert files and their sizes in backup sub-directories // (private/backup_id) to abs_path_to_size - Status s = InsertPathnameToSizeBytes( + Status s = ReadChildFileCurrentSizes( GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_env_, &abs_path_to_size); if (s.ok()) { @@ -969,8 +1069,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( &backuped_file_infos_, backup_env_)))); assert(ret.second == true); auto& new_backup = ret.first->second; - // TODO: What should we do on error here? - new_backup->RecordTimestamp().PermitUncheckedError(); + new_backup->RecordTimestamp(); new_backup->SetAppMetadata(app_metadata); auto start_backup = backup_env_->NowMicros(); @@ -1191,7 +1290,8 @@ Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) { itr++; } for (auto backup_id : to_delete) { - auto s = DeleteBackupInternal(backup_id); + // Do not GC until end + auto s = DeleteBackupNoGC(backup_id); if (!s.ok()) { overall_status = s; } @@ -1208,7 +1308,7 @@ Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) { } Status BackupEngineImpl::DeleteBackup(BackupID backup_id) { - auto s1 = DeleteBackupInternal(backup_id); + auto s1 = DeleteBackupNoGC(backup_id); auto s2 = Status::OK(); // Clean up after any incomplete backup deletion, potentially from @@ -1218,15 +1318,17 @@ Status BackupEngineImpl::DeleteBackup(BackupID backup_id) { } if (!s1.ok()) { - s2.PermitUncheckedError(); // What to do? + // Any failure in the primary objective trumps any failure in the + // secondary objective. + s2.PermitUncheckedError(); return s1; } else { return s2; } } -// Does not auto-GarbageCollect -Status BackupEngineImpl::DeleteBackupInternal(BackupID backup_id) { +// Does not auto-GarbageCollect nor lock +Status BackupEngineImpl::DeleteBackupNoGC(BackupID backup_id) { assert(initialized_); assert(!read_only_); @@ -1319,8 +1421,12 @@ void BackupEngineImpl::GetCorruptedBackups( Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id, const std::string& db_dir, - const std::string& wal_dir) { + const std::string& wal_dir) const { assert(initialized_); + if (backup_id == kLatestBackupIDMarker) { + // Note: Read latest_valid_backup_id_ inside of lock + backup_id = latest_valid_backup_id_; + } auto corrupt_itr = corrupt_backups_.find(backup_id); if (corrupt_itr != corrupt_backups_.end()) { return corrupt_itr->second.first; @@ -1417,7 +1523,8 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options, EnvOptions() /* src_env_options */, false, rate_limiter, 0 /* size_limit */); RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item( - copy_or_create_work_item.result.get_future(), file_info->checksum_hex); + copy_or_create_work_item.result.get_future(), file, dst, + file_info->checksum_hex); files_to_copy_or_create_.write(std::move(copy_or_create_work_item)); restore_items_to_finish.push_back( std::move(after_copy_or_create_work_item)); @@ -1434,7 +1541,10 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options, break; } else if (!item.checksum_hex.empty() && item.checksum_hex != result.checksum_hex) { - s = Status::Corruption("Checksum check failed"); + s = Status::Corruption( + "While restoring " + item.from_file + " -> " + item.to_file + + ": expected checksum is " + item.checksum_hex + + " while computed checksum is " + result.checksum_hex); break; } } @@ -1445,9 +1555,9 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options, } Status BackupEngineImpl::VerifyBackup(BackupID backup_id, - bool verify_with_checksum) { - // Check if backup_id is corrupted, or valid and registered + bool verify_with_checksum) const { assert(initialized_); + // Check if backup_id is corrupted, or valid and registered auto corrupt_itr = corrupt_backups_.find(backup_id); if (corrupt_itr != corrupt_backups_.end()) { return corrupt_itr->second.first; @@ -1470,8 +1580,9 @@ Status BackupEngineImpl::VerifyBackup(BackupID backup_id, for (const auto& rel_dir : {GetPrivateFileRel(backup_id), GetSharedFileRel(), GetSharedFileWithChecksumRel()}) { const auto abs_dir = GetAbsolutePath(rel_dir); - // TODO: What to do on error? - InsertPathnameToSizeBytes(abs_dir, backup_env_, &curr_abs_path_to_size) + // Shared directories allowed to be missing in some cases. Expected but + // missing files will be reported a few lines down. + ReadChildFileCurrentSizes(abs_dir, backup_env_, &curr_abs_path_to_size) .PermitUncheckedError(); } @@ -1742,7 +1853,7 @@ Status BackupEngineImpl::AddBackupFileWorkItem( "overwrite the file.", fname.c_str()); need_to_copy = true; - //**TODO: What to do on error? + // Defer any failure reporting to when we try to write the file backup_env_->DeleteFile(final_dest_path).PermitUncheckedError(); } else { // file exists and referenced @@ -1831,7 +1942,7 @@ Status BackupEngineImpl::AddBackupFileWorkItem( Status BackupEngineImpl::ReadFileAndComputeChecksum( const std::string& src, Env* src_env, const EnvOptions& src_env_options, - uint64_t size_limit, std::string* checksum_hex) { + uint64_t size_limit, std::string* checksum_hex) const { if (checksum_hex == nullptr) { return Status::Aborted("Checksum pointer is null"); } @@ -1927,7 +2038,7 @@ Status BackupEngineImpl::GetFileDbIdentities(Env* src_env, } void BackupEngineImpl::DeleteChildren(const std::string& dir, - uint32_t file_type_filter) { + uint32_t file_type_filter) const { std::vector children; db_env_->GetChildren(dir, &children).PermitUncheckedError(); // ignore errors @@ -1943,9 +2054,9 @@ void BackupEngineImpl::DeleteChildren(const std::string& dir, } } -Status BackupEngineImpl::InsertPathnameToSizeBytes( +Status BackupEngineImpl::ReadChildFileCurrentSizes( const std::string& dir, Env* env, - std::unordered_map* result) { + std::unordered_map* result) const { assert(result != nullptr); std::vector files_attrs; Status status = env->FileExists(dir); @@ -2507,60 +2618,14 @@ Status BackupEngineImpl::BackupMeta::StoreToFile( return s; } -// -------- BackupEngineReadOnlyImpl --------- -class BackupEngineReadOnlyImpl : public BackupEngineReadOnly { - public: - BackupEngineReadOnlyImpl(const BackupableDBOptions& options, Env* db_env) - : backup_engine_(new BackupEngineImpl(options, db_env, true)) {} - - ~BackupEngineReadOnlyImpl() override {} - - // The returned BackupInfos are in chronological order, which means the - // latest backup comes last. - void GetBackupInfo(std::vector* backup_info, - bool include_file_details) const override { - backup_engine_->GetBackupInfo(backup_info, include_file_details); - } - - void GetCorruptedBackups( - std::vector* corrupt_backup_ids) const override { - backup_engine_->GetCorruptedBackups(corrupt_backup_ids); - } - - using BackupEngineReadOnly::RestoreDBFromBackup; - Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id, - const std::string& db_dir, - const std::string& wal_dir) override { - return backup_engine_->RestoreDBFromBackup(options, backup_id, db_dir, - wal_dir); - } - - using BackupEngineReadOnly::RestoreDBFromLatestBackup; - Status RestoreDBFromLatestBackup(const RestoreOptions& options, - const std::string& db_dir, - const std::string& wal_dir) override { - return backup_engine_->RestoreDBFromLatestBackup(options, db_dir, wal_dir); - } - - Status VerifyBackup(BackupID backup_id, - bool verify_with_checksum = false) override { - return backup_engine_->VerifyBackup(backup_id, verify_with_checksum); - } - - Status Initialize() { return backup_engine_->Initialize(); } - - private: - std::unique_ptr backup_engine_; -}; - Status BackupEngineReadOnly::Open(const BackupableDBOptions& options, Env* env, BackupEngineReadOnly** backup_engine_ptr) { if (options.destroy_old_data) { return Status::InvalidArgument( "Can't destroy old data with ReadOnly BackupEngine"); } - std::unique_ptr backup_engine( - new BackupEngineReadOnlyImpl(options, env)); + std::unique_ptr backup_engine( + new BackupEngineImplThreadSafe(options, env, true /*read_only*/)); auto s = backup_engine->Initialize(); if (!s.ok()) { *backup_engine_ptr = nullptr; @@ -2572,9 +2637,9 @@ Status BackupEngineReadOnly::Open(const BackupableDBOptions& options, Env* env, void TEST_EnableWriteFutureSchemaVersion2( BackupEngine* engine, const TEST_FutureSchemaVersion2Options& options) { - BackupEngineImpl* impl = static_cast_with_check(engine); - impl->test_future_options_.reset( - new TEST_FutureSchemaVersion2Options(options)); + BackupEngineImplThreadSafe* impl = + static_cast_with_check(engine); + impl->TEST_EnableWriteFutureSchemaVersion2(options); } } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 3096f3374..d39cc2cc9 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -12,7 +12,9 @@ #include "rocksdb/utilities/backupable_db.h" #include +#include #include +#include #include #include #include @@ -2975,6 +2977,142 @@ TEST_F(BackupableDBTest, FutureMetaSchemaVersion2_Restore) { } } +TEST_F(BackupableDBTest, Concurrency) { + // Check that we can simultaneously: + // * Run several read operations in different threads on a single + // BackupEngine object, and + // * With another BackupEngine object on the same + // backup_dir, run the same read operations in another thread, and + // * With yet another BackupEngine object on the same + // backup_dir, create two new backups in parallel threads. + // + // Because of the challenges of integrating this into db_stress, + // this is a non-deterministic mini-stress test here instead. + OpenDBAndBackupEngine(true, false, kShareWithChecksum); + + static constexpr int keys_iteration = 5000; + FillDB(db_.get(), 0, keys_iteration); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + + FillDB(db_.get(), keys_iteration, 2 * keys_iteration); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + + static constexpr int max_factor = 3; + FillDB(db_.get(), 2 * keys_iteration, max_factor * keys_iteration); + // will create another backup soon... + + Options db_opts = options_; + db_opts.wal_dir = ""; + BackupableDBOptions be_opts = *backupable_options_; + be_opts.destroy_old_data = false; + + std::mt19937 rng{std::random_device()()}; + + std::array read_threads; + for (uint32_t i = 0; i < read_threads.size(); ++i) { + uint32_t sleep_micros = rng() % 100000; + read_threads[i] = std::thread([this, i, sleep_micros, &db_opts, &be_opts] { + test_db_env_->SleepForMicroseconds(sleep_micros); + + // Whether to also re-open the BackupEngine, potentially seeing + // additional backups + bool reopen = i == 3; + // Whether we are going to restore "latest" + bool latest = i > 1; + + BackupEngine* my_be; + if (reopen) { + ASSERT_OK(BackupEngine::Open(test_db_env_.get(), be_opts, &my_be)); + } else { + my_be = backup_engine_.get(); + } + + // Verify metadata (we don't receive updates from concurrently + // creating a new backup) + std::vector infos; + my_be->GetBackupInfo(&infos); + const uint32_t count = static_cast(infos.size()); + infos.clear(); + if (reopen) { + ASSERT_GE(count, 2U); + ASSERT_LE(count, 4U); + fprintf(stderr, "Reopen saw %u backups\n", count); + } else { + ASSERT_EQ(count, 2U); + } + std::vector ids; + my_be->GetCorruptedBackups(&ids); + ASSERT_EQ(ids.size(), 0U); + + // Restore one of the backups, or "latest" + std::string restore_db_dir = dbname_ + "/restore" + ToString(i); + BackupID to_restore; + if (latest) { + to_restore = count; + ASSERT_OK( + my_be->RestoreDBFromLatestBackup(restore_db_dir, restore_db_dir)); + } else { + to_restore = i + 1; + ASSERT_OK(my_be->VerifyBackup(to_restore, true)); + ASSERT_OK(my_be->RestoreDBFromBackup(to_restore, restore_db_dir, + restore_db_dir)); + } + + // Open restored DB to verify its contents + DB* restored; + ASSERT_OK(DB::Open(db_opts, restore_db_dir, &restored)); + int factor = std::min(static_cast(to_restore), max_factor); + AssertExists(restored, 0, factor * keys_iteration); + AssertEmpty(restored, factor * keys_iteration, + (factor + 1) * keys_iteration); + delete restored; + + // Re-verify metadata (we don't receive updates from concurrently + // creating a new backup) + my_be->GetBackupInfo(&infos); + ASSERT_EQ(infos.size(), count); + my_be->GetCorruptedBackups(&ids); + ASSERT_EQ(ids.size(), 0); + // fprintf(stderr, "Finished read thread\n"); + + if (reopen) { + delete my_be; + } + }); + } + + BackupEngine* alt_be; + ASSERT_OK(BackupEngine::Open(test_db_env_.get(), be_opts, &alt_be)); + + std::array append_threads; + for (unsigned i = 0; i < append_threads.size(); ++i) { + uint32_t sleep_micros = rng() % 100000; + append_threads[i] = std::thread([this, sleep_micros, alt_be] { + test_db_env_->SleepForMicroseconds(sleep_micros); + // WART: CreateNewBackup doesn't tell you the BackupID it just created, + // which is ugly for multithreaded setting. + // TODO: add delete backup also when that is added + ASSERT_OK(alt_be->CreateNewBackup(db_.get())); + // fprintf(stderr, "Finished append thread\n"); + }); + } + + for (auto& t : append_threads) { + t.join(); + } + // Verify metadata + std::vector infos; + alt_be->GetBackupInfo(&infos); + ASSERT_EQ(infos.size(), 2 + append_threads.size()); + + for (auto& t : read_threads) { + t.join(); + } + + delete alt_be; + CloseDBAndBackupEngine(); +} + TEST_F(BackupableDBTest, LimitBackupsOpened) { // Verify the specified max backups are opened, including skipping over // corrupted backups.