//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <cinttypes>

#include "db/builder.h"
#include "db/db_impl/db_impl.h"
#include "db/error_handler.h"
#include "db/periodic_task_scheduler.h"
#include "env/composite_env_wrapper.h"
#include "file/filename.h"
#include "file/read_write_util.h"
#include "file/sst_file_manager_impl.h"
#include "file/writable_file_writer.h"
#include "logging/logging.h"
#include "monitoring/persistent_stats_history.h"
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
#include "rocksdb/table.h"
#include "rocksdb/wal_filter.h"
#include "test_util/sync_point.h"
#include "util/rate_limiter_impl.h"
#include "util/udt_util.h"

namespace ROCKSDB_NAMESPACE {
Options SanitizeOptions(const std::string& dbname, const Options& src,
                        bool read_only, Status* logger_creation_s) {
  auto db_options =
      SanitizeOptions(dbname, DBOptions(src), read_only, logger_creation_s);
  ImmutableDBOptions immutable_db_options(db_options);
  auto cf_options =
      SanitizeOptions(immutable_db_options, ColumnFamilyOptions(src));
  return Options(db_options, cf_options);
}

DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
                          bool read_only, Status* logger_creation_s) {
  DBOptions result(src);

  if (result.env == nullptr) {
    result.env = Env::Default();
  }

  // result.max_open_files means an "infinite" open files.
  if (result.max_open_files != -1) {
    int max_max_open_files = port::GetMaxOpenFiles();
    if (max_max_open_files == -1) {
      max_max_open_files = 0x400000;
    }
    ClipToRange(&result.max_open_files, 20, max_max_open_files);
    TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles",
                             &result.max_open_files);
  }

  if (result.info_log == nullptr && !read_only) {
    Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
    if (!s.ok()) {
      // No place suitable for logging
      result.info_log = nullptr;
      if (logger_creation_s) {
        *logger_creation_s = s;
      }
    }
  }

  if (!result.write_buffer_manager) {
    result.write_buffer_manager.reset(
        new WriteBufferManager(result.db_write_buffer_size));
  }
  auto bg_job_limits = DBImpl::GetBGJobLimits(
      result.max_background_flushes, result.max_background_compactions,
      result.max_background_jobs, true /* parallelize_compactions */);
  result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
                                           Env::Priority::LOW);
  result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
                                           Env::Priority::HIGH);

  if (result.rate_limiter.get() != nullptr) {
    if (result.bytes_per_sync == 0) {
      result.bytes_per_sync = 1024 * 1024;
    }
  }

  if (result.delayed_write_rate == 0) {
    if (result.rate_limiter.get() != nullptr) {
      result.delayed_write_rate = result.rate_limiter->GetBytesPerSecond();
    }
    if (result.delayed_write_rate == 0) {
      result.delayed_write_rate = 16 * 1024 * 1024;
    }
  }

  if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) {
    result.recycle_log_file_num = false;
  }

  if (result.recycle_log_file_num &&
      (result.wal_recovery_mode ==
           WALRecoveryMode::kTolerateCorruptedTailRecords ||
       result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery ||
       result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
    // - kTolerateCorruptedTailRecords is inconsistent with recycle log file
    //   feature. WAL recycling expects recovery success upon encountering a
    //   corrupt record at the point where new data ends and recycled data
    //   remains at the tail. However, `kTolerateCorruptedTailRecords` must fail
    //   upon encountering any such corrupt record, as it cannot differentiate
    //   between this and a real corruption, which would cause committed updates
    //   to be truncated -- a violation of the recovery guarantee.
    // - kPointInTimeRecovery and kAbsoluteConsistency are incompatible with
    //   recycle log file feature temporarily due to a bug found introducing a
    //   hole in the recovered data
    //   (https://github.com/facebook/rocksdb/pull/7252#issuecomment-673766236).
    //   Besides this bug, we believe the features are fundamentally compatible.
    result.recycle_log_file_num = 0;
  }

  if (result.db_paths.size() == 0) {
    result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
  } else if (result.wal_dir.empty()) {
    // Use dbname as default
    result.wal_dir = dbname;
  }
  if (!result.wal_dir.empty()) {
    // If there is a wal_dir already set, check to see if the wal_dir is the
    // same as the dbname AND the same as the db_path[0] (which must exist from
    // a few lines ago). If the wal_dir matches both of these values, then clear
    // the wal_dir value, which will make wal_dir == dbname.  Most likely this
    // condition was the result of reading an old options file where we forced
    // wal_dir to be set (to dbname).
    auto npath = NormalizePath(dbname + "/");
    if (npath == NormalizePath(result.wal_dir + "/") &&
        npath == NormalizePath(result.db_paths[0].path + "/")) {
      result.wal_dir.clear();
    }
  }

  if (!result.wal_dir.empty() && result.wal_dir.back() == '/') {
    result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
  }

  if (result.compaction_readahead_size == 0) {
    if (result.use_direct_reads) {
      TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
    }
    result.compaction_readahead_size = 1024 * 1024 * 2;
  }

  // Force flush on DB open if 2PC is enabled, since with 2PC we have no
  // guarantee that consecutive log files have consecutive sequence id, which
  // make recovery complicated.
  if (result.allow_2pc) {
    result.avoid_flush_during_recovery = false;
  }

  ImmutableDBOptions immutable_db_options(result);
  if (!immutable_db_options.IsWalDirSameAsDBPath()) {
    // Either the WAL dir and db_paths[0]/db_name are not the same, or we
    // cannot tell for sure. In either case, assume they're different and
    // explicitly cleanup the trash log files (bypass DeleteScheduler)
    // Do this first so even if we end up calling
    // DeleteScheduler::CleanupDirectory on the same dir later, it will be
    // safe
    std::vector<std::string> filenames;
    IOOptions io_opts;
    io_opts.do_not_recurse = true;
    auto wal_dir = immutable_db_options.GetWalDir();
    Status s = immutable_db_options.fs->GetChildren(
        wal_dir, io_opts, &filenames, /*IODebugContext*=*/nullptr);
    s.PermitUncheckedError();  //**TODO: What to do on error?
    for (std::string& filename : filenames) {
      if (filename.find(".log.trash", filename.length() -
                                          std::string(".log.trash").length()) !=
          std::string::npos) {
        std::string trash_file = wal_dir + "/" + filename;
        result.env->DeleteFile(trash_file).PermitUncheckedError();
      }
    }
  }
  // When the DB is stopped, it's possible that there are some .trash files that
  // were not deleted yet, when we open the DB we will find these .trash files
  // and schedule them to be deleted (or delete immediately if SstFileManager
  // was not used)
  auto sfm = static_cast<SstFileManagerImpl*>(result.sst_file_manager.get());
  for (size_t i = 0; i < result.db_paths.size(); i++) {
    DeleteScheduler::CleanupDirectory(result.env, sfm, result.db_paths[i].path)
        .PermitUncheckedError();
  }

  // Create a default SstFileManager for purposes of tracking compaction size
  // and facilitating recovery from out of space errors.
  if (result.sst_file_manager.get() == nullptr) {
    std::shared_ptr<SstFileManager> sst_file_manager(
        NewSstFileManager(result.env, result.info_log));
    result.sst_file_manager = sst_file_manager;
  }

  // Supported wal compression types
  if (!StreamingCompressionTypeSupported(result.wal_compression)) {
    result.wal_compression = kNoCompression;
    ROCKS_LOG_WARN(result.info_log,
                   "wal_compression is disabled since only zstd is supported");
  }

  if (!result.paranoid_checks) {
    result.skip_checking_sst_file_sizes_on_db_open = true;
    ROCKS_LOG_INFO(result.info_log,
                   "file size check will be skipped during open.");
  }

  return result;
}

namespace {
Status ValidateOptionsByTable(
    const DBOptions& db_opts,
    const std::vector<ColumnFamilyDescriptor>& column_families) {
  Status s;
  for (auto& cf : column_families) {
    s = ValidateOptions(db_opts, cf.options);
    if (!s.ok()) {
      return s;
    }
  }
  return Status::OK();
}
}  // namespace

Status DBImpl::ValidateOptions(
    const DBOptions& db_options,
    const std::vector<ColumnFamilyDescriptor>& column_families) {
  Status s;
  for (auto& cfd : column_families) {
    s = ColumnFamilyData::ValidateOptions(db_options, cfd.options);
    if (!s.ok()) {
      return s;
    }
  }
  s = ValidateOptions(db_options);
  return s;
}

Status DBImpl::ValidateOptions(const DBOptions& db_options) {
  if (db_options.db_paths.size() > 4) {
    return Status::NotSupported(
        "More than four DB paths are not supported yet. ");
  }

  if (db_options.allow_mmap_reads && db_options.use_direct_reads) {
    // Protect against assert in PosixMMapReadableFile constructor
    return Status::NotSupported(
        "If memory mapped reads (allow_mmap_reads) are enabled "
        "then direct I/O reads (use_direct_reads) must be disabled. ");
  }

  if (db_options.allow_mmap_writes &&
      db_options.use_direct_io_for_flush_and_compaction) {
    return Status::NotSupported(
        "If memory mapped writes (allow_mmap_writes) are enabled "
        "then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
        "be disabled. ");
  }

  if (db_options.keep_log_file_num == 0) {
    return Status::InvalidArgument("keep_log_file_num must be greater than 0");
  }

  if (db_options.unordered_write &&
      !db_options.allow_concurrent_memtable_write) {
    return Status::InvalidArgument(
        "unordered_write is incompatible with "
        "!allow_concurrent_memtable_write");
  }

  if (db_options.unordered_write && db_options.enable_pipelined_write) {
    return Status::InvalidArgument(
        "unordered_write is incompatible with enable_pipelined_write");
  }

  if (db_options.atomic_flush && db_options.enable_pipelined_write) {
    return Status::InvalidArgument(
        "atomic_flush is incompatible with enable_pipelined_write");
  }

  // TODO remove this restriction
  if (db_options.atomic_flush && db_options.best_efforts_recovery) {
    return Status::InvalidArgument(
        "atomic_flush is currently incompatible with best-efforts recovery");
  }

  if (db_options.use_direct_io_for_flush_and_compaction &&
      0 == db_options.writable_file_max_buffer_size) {
    return Status::InvalidArgument(
        "writes in direct IO require writable_file_max_buffer_size > 0");
  }

  return Status::OK();
}

Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
  VersionEdit new_db;
  Status s = SetIdentityFile(env_, dbname_);
  if (!s.ok()) {
    return s;
  }
  if (immutable_db_options_.write_dbid_to_manifest) {
    std::string temp_db_id;
    GetDbIdentityFromIdentityFile(&temp_db_id);
    new_db.SetDBId(temp_db_id);
  }
  new_db.SetLogNumber(0);
  new_db.SetNextFile(2);
  new_db.SetLastSequence(0);

  ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
  const std::string manifest = DescriptorFileName(dbname_, 1);
  {
    if (fs_->FileExists(manifest, IOOptions(), nullptr).ok()) {
      fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
    }
    std::unique_ptr<FSWritableFile> file;
    FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
    s = NewWritableFile(fs_.get(), manifest, &file, file_options);
    if (!s.ok()) {
      return s;
    }
    FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
    file->SetPreallocationBlockSize(
        immutable_db_options_.manifest_preallocation_size);
    std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
        std::move(file), manifest, file_options, immutable_db_options_.clock,
        io_tracer_, nullptr /* stats */, immutable_db_options_.listeners,
        nullptr, tmp_set.Contains(FileType::kDescriptorFile),
        tmp_set.Contains(FileType::kDescriptorFile)));
    log::Writer log(std::move(file_writer), 0, false);
    std::string record;
    new_db.EncodeTo(&record);
    s = log.AddRecord(record);
    if (s.ok()) {
      s = SyncManifest(&immutable_db_options_, log.file());
    }
  }
  if (s.ok()) {
    // Make "CURRENT" file that points to the new manifest file.
    s = SetCurrentFile(fs_.get(), dbname_, 1, directories_.GetDbDir());
    if (new_filenames) {
      new_filenames->emplace_back(
          manifest.substr(manifest.find_last_of("/\\") + 1));
    }
  } else {
    fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
  }
  return s;
}

IOStatus DBImpl::CreateAndNewDirectory(
    FileSystem* fs, const std::string& dirname,
    std::unique_ptr<FSDirectory>* directory) {
  // We call CreateDirIfMissing() as the directory may already exist (if we
  // are reopening a DB), when this happens we don't want creating the
  // directory to cause an error. However, we need to check if creating the
  // directory fails or else we may get an obscure message about the lock
  // file not existing. One real-world example of this occurring is if
  // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
  // when dbname_ is "dir/db" but when "dir" doesn't exist.
  IOStatus io_s = fs->CreateDirIfMissing(dirname, IOOptions(), nullptr);
  if (!io_s.ok()) {
    return io_s;
  }
  return fs->NewDirectory(dirname, IOOptions(), directory, nullptr);
}

IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname,
                                     const std::string& wal_dir,
                                     const std::vector<DbPath>& data_paths) {
  IOStatus io_s = DBImpl::CreateAndNewDirectory(fs, dbname, &db_dir_);
  if (!io_s.ok()) {
    return io_s;
  }
  if (!wal_dir.empty() && dbname != wal_dir) {
    io_s = DBImpl::CreateAndNewDirectory(fs, wal_dir, &wal_dir_);
    if (!io_s.ok()) {
      return io_s;
    }
  }

  data_dirs_.clear();
  for (auto& p : data_paths) {
    const std::string db_path = p.path;
    if (db_path == dbname) {
      data_dirs_.emplace_back(nullptr);
    } else {
      std::unique_ptr<FSDirectory> path_directory;
      io_s = DBImpl::CreateAndNewDirectory(fs, db_path, &path_directory);
      if (!io_s.ok()) {
        return io_s;
      }
      data_dirs_.emplace_back(path_directory.release());
    }
  }
  assert(data_dirs_.size() == data_paths.size());
  return IOStatus::OK();
}

Status DBImpl::Recover(
    const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
    bool error_if_wal_file_exists, bool error_if_data_exists_in_wals,
    uint64_t* recovered_seq, RecoveryContext* recovery_ctx) {
  mutex_.AssertHeld();

  bool is_new_db = false;
  assert(db_lock_ == nullptr);
  std::vector<std::string> files_in_dbname;
  if (!read_only) {
    Status s = directories_.SetDirectories(fs_.get(), dbname_,
                                           immutable_db_options_.wal_dir,
                                           immutable_db_options_.db_paths);
    if (!s.ok()) {
      return s;
    }

    s = env_->LockFile(LockFileName(dbname_), &db_lock_);
    if (!s.ok()) {
      return s;
    }

    std::string current_fname = CurrentFileName(dbname_);
    // Path to any MANIFEST file in the db dir. It does not matter which one.
    // Since best-efforts recovery ignores CURRENT file, existence of a
    // MANIFEST indicates the recovery to recover existing db. If no MANIFEST
    // can be found, a new db will be created.
    std::string manifest_path;
    if (!immutable_db_options_.best_efforts_recovery) {
      s = env_->FileExists(current_fname);
    } else {
      s = Status::NotFound();
      IOOptions io_opts;
      io_opts.do_not_recurse = true;
      Status io_s = immutable_db_options_.fs->GetChildren(
          dbname_, io_opts, &files_in_dbname, /*IODebugContext*=*/nullptr);
      if (!io_s.ok()) {
        s = io_s;
        files_in_dbname.clear();
      }
      for (const std::string& file : files_in_dbname) {
        uint64_t number = 0;
        FileType type = kWalFile;  // initialize
        if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
          uint64_t bytes;
          s = env_->GetFileSize(DescriptorFileName(dbname_, number), &bytes);
          if (s.ok() && bytes != 0) {
            // Found non-empty MANIFEST (descriptor log), thus best-efforts
            // recovery does not have to treat the db as empty.
            manifest_path = dbname_ + "/" + file;
            break;
          }
        }
      }
    }
    if (s.IsNotFound()) {
      if (immutable_db_options_.create_if_missing) {
        s = NewDB(&files_in_dbname);
        is_new_db = true;
        if (!s.ok()) {
          return s;
        }
      } else {
        return Status::InvalidArgument(
            current_fname, "does not exist (create_if_missing is false)");
      }
    } else if (s.ok()) {
      if (immutable_db_options_.error_if_exists) {
        return Status::InvalidArgument(dbname_,
                                       "exists (error_if_exists is true)");
      }
    } else {
      // Unexpected error reading file
      assert(s.IsIOError());
      return s;
    }
    // Verify compatibility of file_options_ and filesystem
    {
      std::unique_ptr<FSRandomAccessFile> idfile;
      FileOptions customized_fs(file_options_);
      customized_fs.use_direct_reads |=
          immutable_db_options_.use_direct_io_for_flush_and_compaction;
      const std::string& fname =
          manifest_path.empty() ? current_fname : manifest_path;
      s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
      if (!s.ok()) {
        std::string error_str = s.ToString();
        // Check if unsupported Direct I/O is the root cause
        customized_fs.use_direct_reads = false;
        s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
        if (s.ok()) {
          return Status::InvalidArgument(
              "Direct I/O is not supported by the specified DB.");
        } else {
          return Status::InvalidArgument(
              "Found options incompatible with filesystem", error_str.c_str());
        }
      }
    }
  } else if (immutable_db_options_.best_efforts_recovery) {
    assert(files_in_dbname.empty());
    IOOptions io_opts;
    io_opts.do_not_recurse = true;
    Status s = immutable_db_options_.fs->GetChildren(
        dbname_, io_opts, &files_in_dbname, /*IODebugContext*=*/nullptr);
    if (s.IsNotFound()) {
      return Status::InvalidArgument(dbname_,
                                     "does not exist (open for read only)");
    } else if (s.IsIOError()) {
      return s;
    }
    assert(s.ok());
  }
  assert(db_id_.empty());
  Status s;
  bool missing_table_file = false;
  if (!immutable_db_options_.best_efforts_recovery) {
    s = versions_->Recover(column_families, read_only, &db_id_);
  } else {
    assert(!files_in_dbname.empty());
    s = versions_->TryRecover(column_families, read_only, files_in_dbname,
                              &db_id_, &missing_table_file);
    if (s.ok()) {
      // TryRecover may delete previous column_family_set_.
      column_family_memtables_.reset(
          new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
    }
  }
  if (!s.ok()) {
    return s;
  }
  if (s.ok() && !read_only) {
    for (auto cfd : *versions_->GetColumnFamilySet()) {
      // Try to trivially move files down the LSM tree to start from bottommost
      // level when level_compaction_dynamic_level_bytes is enabled. This should
      // only be useful when user is migrating to turning on this option.
      // If a user is migrating from Level Compaction with a smaller level
      // multiplier or from Universal Compaction, there may be too many
      // non-empty levels and the trivial moves here are not sufficed for
      // migration. Additional compactions are needed to drain unnecessary
      // levels.
      //
      // Note that this step moves files down LSM without consulting
      // SSTPartitioner. Further compactions are still needed if
      // the user wants to partition SST files.
      // Note that files moved in this step may not respect the compression
      // option in target level.
      if (cfd->ioptions()->compaction_style ==
              CompactionStyle::kCompactionStyleLevel &&
          cfd->ioptions()->level_compaction_dynamic_level_bytes &&
          !cfd->GetLatestMutableCFOptions()->disable_auto_compactions) {
        int to_level = cfd->ioptions()->num_levels - 1;
        // last level is reserved
        // allow_ingest_behind does not support Level Compaction,
        // and per_key_placement can have infinite compaction loop for Level
        // Compaction. Adjust to_level here just to be safe.
        if (cfd->ioptions()->allow_ingest_behind ||
            cfd->ioptions()->preclude_last_level_data_seconds > 0) {
          to_level -= 1;
        }
        // Whether this column family has a level trivially moved
        bool moved = false;
        // Fill the LSM starting from to_level and going up one level at a time.
        // Some loop invariants (when last level is not reserved):
        // - levels in (from_level, to_level] are empty, and
        // - levels in (to_level, last_level] are non-empty.
        for (int from_level = to_level; from_level >= 0; --from_level) {
          const std::vector<FileMetaData*>& level_files =
              cfd->current()->storage_info()->LevelFiles(from_level);
          if (level_files.empty() || from_level == 0) {
            continue;
          }
          assert(from_level <= to_level);
          // Trivial move files from `from_level` to `to_level`
          if (from_level < to_level) {
            if (!moved) {
              // lsm_state will look like "[1,2,3,4,5,6,0]" for an LSM with
              // 7 levels
              std::string lsm_state = "[";
              for (int i = 0; i < cfd->ioptions()->num_levels; ++i) {
                lsm_state += std::to_string(
                    cfd->current()->storage_info()->NumLevelFiles(i));
                if (i < cfd->ioptions()->num_levels - 1) {
                  lsm_state += ",";
                }
              }
              lsm_state += "]";
              ROCKS_LOG_WARN(immutable_db_options_.info_log,
                             "[%s] Trivially move files down the LSM when open "
                             "with level_compaction_dynamic_level_bytes=true,"
                             " lsm_state: %s (Files are moved only if DB "
                             "Recovery is successful).",
                             cfd->GetName().c_str(), lsm_state.c_str());
              moved = true;
            }
            ROCKS_LOG_WARN(
                immutable_db_options_.info_log,
                "[%s] Moving %zu files from from_level-%d to from_level-%d",
                cfd->GetName().c_str(), level_files.size(), from_level,
                to_level);
            VersionEdit edit;
            edit.SetColumnFamily(cfd->GetID());
            for (const FileMetaData* f : level_files) {
              edit.DeleteFile(from_level, f->fd.GetNumber());
              edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
                           f->fd.GetFileSize(), f->smallest, f->largest,
                           f->fd.smallest_seqno, f->fd.largest_seqno,
                           f->marked_for_compaction,
                           f->temperature,  // this can be different from
                                            // `last_level_temperature`
                           f->oldest_blob_file_number, f->oldest_ancester_time,
                           f->file_creation_time, f->epoch_number,
                           f->file_checksum, f->file_checksum_func_name,
                           f->unique_id, f->compensated_range_deletion_size,
                           f->tail_size, f->user_defined_timestamps_persisted);
              ROCKS_LOG_WARN(immutable_db_options_.info_log,
                             "[%s] Moving #%" PRIu64
                             " from from_level-%d to from_level-%d %" PRIu64
                             " bytes\n",
                             cfd->GetName().c_str(), f->fd.GetNumber(),
                             from_level, to_level, f->fd.GetFileSize());
            }
            recovery_ctx->UpdateVersionEdits(cfd, edit);
          }
          --to_level;
        }
      }
    }
  }
  s = SetupDBId(read_only, recovery_ctx);
  ROCKS_LOG_INFO(immutable_db_options_.info_log, "DB ID: %s\n", db_id_.c_str());
  if (s.ok() && !read_only) {
    s = DeleteUnreferencedSstFiles(recovery_ctx);
  }

  if (immutable_db_options_.paranoid_checks && s.ok()) {
    s = CheckConsistency();
  }
  if (s.ok() && !read_only) {
    // TODO: share file descriptors (FSDirectory) with SetDirectories above
    std::map<std::string, std::shared_ptr<FSDirectory>> created_dirs;
    for (auto cfd : *versions_->GetColumnFamilySet()) {
      s = cfd->AddDirectories(&created_dirs);
      if (!s.ok()) {
        return s;
      }
    }
  }

  std::vector<std::string> files_in_wal_dir;
  if (s.ok()) {
    // Initial max_total_in_memory_state_ before recovery wals. Log recovery
    // may check this value to decide whether to flush.
    max_total_in_memory_state_ = 0;
    for (auto cfd : *versions_->GetColumnFamilySet()) {
      auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
      max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
                                    mutable_cf_options->max_write_buffer_number;
    }

    SequenceNumber next_sequence(kMaxSequenceNumber);
    default_cf_handle_ = new ColumnFamilyHandleImpl(
        versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
    default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();

    // Recover from all newer log files than the ones named in the
    // descriptor (new log files may have been added by the previous
    // incarnation without registering them in the descriptor).
    //
    // Note that prev_log_number() is no longer used, but we pay
    // attention to it in case we are recovering a database
    // produced by an older version of rocksdb.
    auto wal_dir = immutable_db_options_.GetWalDir();
    if (!immutable_db_options_.best_efforts_recovery) {
      IOOptions io_opts;
      io_opts.do_not_recurse = true;
      s = immutable_db_options_.fs->GetChildren(
          wal_dir, io_opts, &files_in_wal_dir, /*IODebugContext*=*/nullptr);
    }
    if (s.IsNotFound()) {
      return Status::InvalidArgument("wal_dir not found", wal_dir);
    } else if (!s.ok()) {
      return s;
    }

    std::unordered_map<uint64_t, std::string> wal_files;
    for (const auto& file : files_in_wal_dir) {
      uint64_t number;
      FileType type;
      if (ParseFileName(file, &number, &type) && type == kWalFile) {
        if (is_new_db) {
          return Status::Corruption(
              "While creating a new Db, wal_dir contains "
              "existing log file: ",
              file);
        } else {
          wal_files[number] = LogFileName(wal_dir, number);
        }
      }
    }

    if (immutable_db_options_.track_and_verify_wals_in_manifest) {
      if (!immutable_db_options_.best_efforts_recovery) {
        // Verify WALs in MANIFEST.
        s = versions_->GetWalSet().CheckWals(env_, wal_files);
      }  // else since best effort recovery does not recover from WALs, no need
         // to check WALs.
    } else if (!versions_->GetWalSet().GetWals().empty()) {
      // Tracking is disabled, clear previously tracked WALs from MANIFEST,
      // otherwise, in the future, if WAL tracking is enabled again,
      // since the WALs deleted when WAL tracking is disabled are not persisted
      // into MANIFEST, WAL check may fail.
      VersionEdit edit;
      WalNumber max_wal_number =
          versions_->GetWalSet().GetWals().rbegin()->first;
      edit.DeleteWalsBefore(max_wal_number + 1);
      assert(recovery_ctx != nullptr);
      assert(versions_->GetColumnFamilySet() != nullptr);
      recovery_ctx->UpdateVersionEdits(
          versions_->GetColumnFamilySet()->GetDefault(), edit);
    }
    if (!s.ok()) {
      return s;
    }

    if (!wal_files.empty()) {
      if (error_if_wal_file_exists) {
        return Status::Corruption(
            "The db was opened in readonly mode with error_if_wal_file_exists"
            "flag but a WAL file already exists");
      } else if (error_if_data_exists_in_wals) {
        for (auto& wal_file : wal_files) {
          uint64_t bytes;
          s = env_->GetFileSize(wal_file.second, &bytes);
          if (s.ok()) {
            if (bytes > 0) {
              return Status::Corruption(
                  "error_if_data_exists_in_wals is set but there are data "
                  " in WAL files.");
            }
          }
        }
      }
    }

    if (!wal_files.empty()) {
      // Recover in the order in which the wals were generated
      std::vector<uint64_t> wals;
      wals.reserve(wal_files.size());
      for (const auto& wal_file : wal_files) {
        wals.push_back(wal_file.first);
      }
      std::sort(wals.begin(), wals.end());

      bool corrupted_wal_found = false;
      s = RecoverLogFiles(wals, &next_sequence, read_only, &corrupted_wal_found,
                          recovery_ctx);
      if (corrupted_wal_found && recovered_seq != nullptr) {
        *recovered_seq = next_sequence;
      }
      if (!s.ok()) {
        // Clear memtables if recovery failed
        for (auto cfd : *versions_->GetColumnFamilySet()) {
          cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
                                 kMaxSequenceNumber);
        }
      }
    }
  }

  if (read_only) {
    // If we are opening as read-only, we need to update options_file_number_
    // to reflect the most recent OPTIONS file. It does not matter for regular
    // read-write db instance because options_file_number_ will later be
    // updated to versions_->NewFileNumber() in RenameTempFileToOptionsFile.
    std::vector<std::string> filenames;
    if (s.ok()) {
      const std::string normalized_dbname = NormalizePath(dbname_);
      const std::string normalized_wal_dir =
          NormalizePath(immutable_db_options_.GetWalDir());
      if (immutable_db_options_.best_efforts_recovery) {
        filenames = std::move(files_in_dbname);
      } else if (normalized_dbname == normalized_wal_dir) {
        filenames = std::move(files_in_wal_dir);
      } else {
        IOOptions io_opts;
        io_opts.do_not_recurse = true;
        s = immutable_db_options_.fs->GetChildren(
            GetName(), io_opts, &filenames, /*IODebugContext*=*/nullptr);
      }
    }
    if (s.ok()) {
      uint64_t number = 0;
      uint64_t options_file_number = 0;
      FileType type;
      for (const auto& fname : filenames) {
        if (ParseFileName(fname, &number, &type) && type == kOptionsFile) {
          options_file_number = std::max(number, options_file_number);
        }
      }
      versions_->options_file_number_ = options_file_number;
      uint64_t options_file_size = 0;
      if (options_file_number > 0) {
        s = env_->GetFileSize(OptionsFileName(GetName(), options_file_number),
                              &options_file_size);
      }
      versions_->options_file_size_ = options_file_size;
    }
  }
  return s;
}

Status DBImpl::PersistentStatsProcessFormatVersion() {
  mutex_.AssertHeld();
  Status s;
  // persist version when stats CF doesn't exist
  bool should_persist_format_version = !persistent_stats_cfd_exists_;
  mutex_.Unlock();
  if (persistent_stats_cfd_exists_) {
    // Check persistent stats format version compatibility. Drop and recreate
    // persistent stats CF if format version is incompatible
    uint64_t format_version_recovered = 0;
    Status s_format = DecodePersistentStatsVersionNumber(
        this, StatsVersionKeyType::kFormatVersion, &format_version_recovered);
    uint64_t compatible_version_recovered = 0;
    Status s_compatible = DecodePersistentStatsVersionNumber(
        this, StatsVersionKeyType::kCompatibleVersion,
        &compatible_version_recovered);
    // abort reading from existing stats CF if any of following is true:
    // 1. failed to read format version or compatible version from disk
    // 2. sst's format version is greater than current format version, meaning
    // this sst is encoded with a newer RocksDB release, and current compatible
    // version is below the sst's compatible version
    if (!s_format.ok() || !s_compatible.ok() ||
        (kStatsCFCurrentFormatVersion < format_version_recovered &&
         kStatsCFCompatibleFormatVersion < compatible_version_recovered)) {
      if (!s_format.ok() || !s_compatible.ok()) {
        ROCKS_LOG_WARN(
            immutable_db_options_.info_log,
            "Recreating persistent stats column family since reading "
            "persistent stats version key failed. Format key: %s, compatible "
            "key: %s",
            s_format.ToString().c_str(), s_compatible.ToString().c_str());
      } else {
        ROCKS_LOG_WARN(
            immutable_db_options_.info_log,
            "Recreating persistent stats column family due to corrupted or "
            "incompatible format version. Recovered format: %" PRIu64
            "; recovered format compatible since: %" PRIu64 "\n",
            format_version_recovered, compatible_version_recovered);
      }
      s = DropColumnFamily(persist_stats_cf_handle_);
      if (s.ok()) {
        s = DestroyColumnFamilyHandle(persist_stats_cf_handle_);
      }
      ColumnFamilyHandle* handle = nullptr;
      if (s.ok()) {
        ColumnFamilyOptions cfo;
        OptimizeForPersistentStats(&cfo);
        s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
      }
      if (s.ok()) {
        persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
        // should also persist version here because old stats CF is discarded
        should_persist_format_version = true;
      }
    }
  }
  if (should_persist_format_version) {
    // Persistent stats CF being created for the first time, need to write
    // format version key
    WriteBatch batch;
    if (s.ok()) {
      s = batch.Put(persist_stats_cf_handle_, kFormatVersionKeyString,
                    std::to_string(kStatsCFCurrentFormatVersion));
    }
    if (s.ok()) {
      s = batch.Put(persist_stats_cf_handle_, kCompatibleVersionKeyString,
                    std::to_string(kStatsCFCompatibleFormatVersion));
    }
    if (s.ok()) {
      WriteOptions wo;
      wo.low_pri = true;
      wo.no_slowdown = true;
      wo.sync = false;
      s = Write(wo, &batch);
    }
  }
  mutex_.Lock();
  return s;
}

Status DBImpl::InitPersistStatsColumnFamily() {
  mutex_.AssertHeld();
  assert(!persist_stats_cf_handle_);
  ColumnFamilyData* persistent_stats_cfd =
      versions_->GetColumnFamilySet()->GetColumnFamily(
          kPersistentStatsColumnFamilyName);
  persistent_stats_cfd_exists_ = persistent_stats_cfd != nullptr;

  Status s;
  if (persistent_stats_cfd != nullptr) {
    // We are recovering from a DB which already contains persistent stats CF,
    // the CF is already created in VersionSet::ApplyOneVersionEdit, but
    // column family handle was not. Need to explicitly create handle here.
    persist_stats_cf_handle_ =
        new ColumnFamilyHandleImpl(persistent_stats_cfd, this, &mutex_);
  } else {
    mutex_.Unlock();
    ColumnFamilyHandle* handle = nullptr;
    ColumnFamilyOptions cfo;
    OptimizeForPersistentStats(&cfo);
    s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
    persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
    mutex_.Lock();
  }
  return s;
}

Status DBImpl::LogAndApplyForRecovery(const RecoveryContext& recovery_ctx) {
  mutex_.AssertHeld();
  assert(versions_->descriptor_log_ == nullptr);
  const ReadOptions read_options(Env::IOActivity::kDBOpen);
  Status s = versions_->LogAndApply(
      recovery_ctx.cfds_, recovery_ctx.mutable_cf_opts_, read_options,
      recovery_ctx.edit_lists_, &mutex_, directories_.GetDbDir());
  if (s.ok() && !(recovery_ctx.files_to_delete_.empty())) {
    mutex_.Unlock();
    for (const auto& fname : recovery_ctx.files_to_delete_) {
      s = env_->DeleteFile(fname);
      if (!s.ok()) {
        break;
      }
    }
    mutex_.Lock();
  }
  return s;
}

void DBImpl::InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap() {
  if (immutable_db_options_.wal_filter == nullptr) {
    return;
  }
  assert(immutable_db_options_.wal_filter != nullptr);
  WalFilter& wal_filter = *(immutable_db_options_.wal_filter);

  std::map<std::string, uint32_t> cf_name_id_map;
  std::map<uint32_t, uint64_t> cf_lognumber_map;
  assert(versions_);
  assert(versions_->GetColumnFamilySet());
  for (auto cfd : *versions_->GetColumnFamilySet()) {
    assert(cfd);
    cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
    cf_lognumber_map.insert(std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
  }

  wal_filter.ColumnFamilyLogNumberMap(cf_lognumber_map, cf_name_id_map);
}

bool DBImpl::InvokeWalFilterIfNeededOnWalRecord(uint64_t wal_number,
                                                const std::string& wal_fname,
                                                log::Reader::Reporter& reporter,
                                                Status& status,
                                                bool& stop_replay,
                                                WriteBatch& batch) {
  if (immutable_db_options_.wal_filter == nullptr) {
    return true;
  }
  assert(immutable_db_options_.wal_filter != nullptr);
  WalFilter& wal_filter = *(immutable_db_options_.wal_filter);

  WriteBatch new_batch;
  bool batch_changed = false;

  bool process_current_record = true;

  WalFilter::WalProcessingOption wal_processing_option =
      wal_filter.LogRecordFound(wal_number, wal_fname, batch, &new_batch,
                                &batch_changed);

  switch (wal_processing_option) {
    case WalFilter::WalProcessingOption::kContinueProcessing:
      // do nothing, proceeed normally
      break;
    case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
      // skip current record
      process_current_record = false;
      break;
    case WalFilter::WalProcessingOption::kStopReplay:
      // skip current record and stop replay
      process_current_record = false;
      stop_replay = true;
      break;
    case WalFilter::WalProcessingOption::kCorruptedRecord: {
      status = Status::Corruption("Corruption reported by Wal Filter ",
                                  wal_filter.Name());
      MaybeIgnoreError(&status);
      if (!status.ok()) {
        process_current_record = false;
        reporter.Corruption(batch.GetDataSize(), status);
      }
      break;
    }
    default: {
      // logical error which should not happen. If RocksDB throws, we would
      // just do `throw std::logic_error`.
      assert(false);
      status = Status::NotSupported(
          "Unknown WalProcessingOption returned by Wal Filter ",
          wal_filter.Name());
      MaybeIgnoreError(&status);
      if (!status.ok()) {
        // Ignore the error with current record processing.
        stop_replay = true;
      }
      break;
    }
  }

  if (!process_current_record) {
    return false;
  }

  if (batch_changed) {
    // Make sure that the count in the new batch is
    // within the orignal count.
    int new_count = WriteBatchInternal::Count(&new_batch);
    int original_count = WriteBatchInternal::Count(&batch);
    if (new_count > original_count) {
      ROCKS_LOG_FATAL(
          immutable_db_options_.info_log,
          "Recovering log #%" PRIu64
          " mode %d log filter %s returned "
          "more records (%d) than original (%d) which is not allowed. "
          "Aborting recovery.",
          wal_number, static_cast<int>(immutable_db_options_.wal_recovery_mode),
          wal_filter.Name(), new_count, original_count);
      status = Status::NotSupported(
          "More than original # of records "
          "returned by Wal Filter ",
          wal_filter.Name());
      return false;
    }
    // Set the same sequence number in the new_batch
    // as the original batch.
    WriteBatchInternal::SetSequence(&new_batch,
                                    WriteBatchInternal::Sequence(&batch));
    batch = new_batch;
  }
  return true;
}

// REQUIRES: wal_numbers are sorted in ascending order
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
                               SequenceNumber* next_sequence, bool read_only,
                               bool* corrupted_wal_found,
                               RecoveryContext* recovery_ctx) {
  struct LogReporter : public log::Reader::Reporter {
    Env* env;
    Logger* info_log;
    const char* fname;
    Status* status;  // nullptr if immutable_db_options_.paranoid_checks==false
    void Corruption(size_t bytes, const Status& s) override {
      ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
                     (status == nullptr ? "(ignoring error) " : ""), fname,
                     static_cast<int>(bytes), s.ToString().c_str());
      if (status != nullptr && status->ok()) {
        *status = s;
      }
    }
  };

  mutex_.AssertHeld();
  Status status;
  std::unordered_map<int, VersionEdit> version_edits;
  // no need to refcount because iteration is under mutex
  for (auto cfd : *versions_->GetColumnFamilySet()) {
    VersionEdit edit;
    edit.SetColumnFamily(cfd->GetID());
    version_edits.insert({cfd->GetID(), edit});
  }
  int job_id = next_job_id_.fetch_add(1);
  {
    auto stream = event_logger_.Log();
    stream << "job" << job_id << "event"
           << "recovery_started";
    stream << "wal_files";
    stream.StartArray();
    for (auto wal_number : wal_numbers) {
      stream << wal_number;
    }
    stream.EndArray();
  }

  // No-op for immutable_db_options_.wal_filter == nullptr.
  InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap();

  bool stop_replay_by_wal_filter = false;
  bool stop_replay_for_corruption = false;
  bool flushed = false;
  uint64_t corrupted_wal_number = kMaxSequenceNumber;
  uint64_t min_wal_number = MinLogNumberToKeep();
  if (!allow_2pc()) {
    // In non-2pc mode, we skip WALs that do not back unflushed data.
    min_wal_number =
        std::max(min_wal_number, versions_->MinLogNumberWithUnflushedData());
  }
  for (auto wal_number : wal_numbers) {
    if (wal_number < min_wal_number) {
      ROCKS_LOG_INFO(immutable_db_options_.info_log,
                     "Skipping log #%" PRIu64
                     " since it is older than min log to keep #%" PRIu64,
                     wal_number, min_wal_number);
      continue;
    }
    // The previous incarnation may not have written any MANIFEST
    // records after allocating this log number.  So we manually
    // update the file number allocation counter in VersionSet.
    versions_->MarkFileNumberUsed(wal_number);
    // Open the log file
    std::string fname =
        LogFileName(immutable_db_options_.GetWalDir(), wal_number);

    ROCKS_LOG_INFO(immutable_db_options_.info_log,
                   "Recovering log #%" PRIu64 " mode %d", wal_number,
                   static_cast<int>(immutable_db_options_.wal_recovery_mode));
    auto logFileDropped = [this, &fname]() {
      uint64_t bytes;
      if (env_->GetFileSize(fname, &bytes).ok()) {
        auto info_log = immutable_db_options_.info_log.get();
        ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(),
                       static_cast<int>(bytes));
      }
    };
    if (stop_replay_by_wal_filter) {
      logFileDropped();
      continue;
    }

    std::unique_ptr<SequentialFileReader> file_reader;
    {
      std::unique_ptr<FSSequentialFile> file;
      status = fs_->NewSequentialFile(
          fname, fs_->OptimizeForLogRead(file_options_), &file, nullptr);
      if (!status.ok()) {
        MaybeIgnoreError(&status);
        if (!status.ok()) {
          return status;
        } else {
          // Fail with one log file, but that's ok.
          // Try next one.
          continue;
        }
      }
      file_reader.reset(new SequentialFileReader(
          std::move(file), fname, immutable_db_options_.log_readahead_size,
          io_tracer_));
    }

    // Create the log reader.
    LogReporter reporter;
    reporter.env = env_;
    reporter.info_log = immutable_db_options_.info_log.get();
    reporter.fname = fname.c_str();
    if (!immutable_db_options_.paranoid_checks ||
        immutable_db_options_.wal_recovery_mode ==
            WALRecoveryMode::kSkipAnyCorruptedRecords) {
      reporter.status = nullptr;
    } else {
      reporter.status = &status;
    }
    // We intentially make log::Reader do checksumming even if
    // paranoid_checks==false so that corruptions cause entire commits
    // to be skipped instead of propagating bad information (like overly
    // large sequence numbers).
    log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
                       &reporter, true /*checksum*/, wal_number);

    // Determine if we should tolerate incomplete records at the tail end of the
    // Read all the records and add to a memtable
    std::string scratch;
    Slice record;

    const UnorderedMap<uint32_t, size_t>& running_ts_sz =
        versions_->GetRunningColumnFamiliesTimestampSize();

    TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal",
                             /*arg=*/nullptr);
    uint64_t record_checksum;
    while (!stop_replay_by_wal_filter &&
           reader.ReadRecord(&record, &scratch,
                             immutable_db_options_.wal_recovery_mode,
                             &record_checksum) &&
           status.ok()) {
      if (record.size() < WriteBatchInternal::kHeader) {
        reporter.Corruption(record.size(),
                            Status::Corruption("log record too small"));
        continue;
      }
      // We create a new batch and initialize with a valid prot_info_ to store
      // the data checksums
      WriteBatch batch;
      std::unique_ptr<WriteBatch> new_batch;

      status = WriteBatchInternal::SetContents(&batch, record);
      if (!status.ok()) {
        return status;
      }

      const UnorderedMap<uint32_t, size_t>& record_ts_sz =
          reader.GetRecordedTimestampSize();
      status = HandleWriteBatchTimestampSizeDifference(
          &batch, running_ts_sz, record_ts_sz,
          TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch);
      if (!status.ok()) {
        return status;
      }

      bool batch_updated = new_batch != nullptr;
      WriteBatch* batch_to_use = batch_updated ? new_batch.get() : &batch;
      TEST_SYNC_POINT_CALLBACK(
          "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch",
          batch_to_use);
      TEST_SYNC_POINT_CALLBACK(
          "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:checksum",
          &record_checksum);
      status = WriteBatchInternal::UpdateProtectionInfo(
          batch_to_use, 8 /* bytes_per_key */,
          batch_updated ? nullptr : &record_checksum);
      if (!status.ok()) {
        return status;
      }

      SequenceNumber sequence = WriteBatchInternal::Sequence(batch_to_use);

      if (immutable_db_options_.wal_recovery_mode ==
          WALRecoveryMode::kPointInTimeRecovery) {
        // In point-in-time recovery mode, if sequence id of log files are
        // consecutive, we continue recovery despite corruption. This could
        // happen when we open and write to a corrupted DB, where sequence id
        // will start from the last sequence id we recovered.
        if (sequence == *next_sequence) {
          stop_replay_for_corruption = false;
        }
        if (stop_replay_for_corruption) {
          logFileDropped();
          break;
        }
      }

      // For the default case of wal_filter == nullptr, always performs no-op
      // and returns true.
      if (!InvokeWalFilterIfNeededOnWalRecord(wal_number, fname, reporter,
                                              status, stop_replay_by_wal_filter,
                                              *batch_to_use)) {
        continue;
      }

      // If column family was not found, it might mean that the WAL write
      // batch references to the column family that was dropped after the
      // insert. We don't want to fail the whole write batch in that case --
      // we just ignore the update.
      // That's why we set ignore missing column families to true
      bool has_valid_writes = false;
      status = WriteBatchInternal::InsertInto(
          batch_to_use, column_family_memtables_.get(), &flush_scheduler_,
          &trim_history_scheduler_, true, wal_number, this,
          false /* concurrent_memtable_writes */, next_sequence,
          &has_valid_writes, seq_per_batch_, batch_per_txn_);
      MaybeIgnoreError(&status);
      if (!status.ok()) {
        // We are treating this as a failure while reading since we read valid
        // blocks that do not form coherent data
        reporter.Corruption(record.size(), status);
        continue;
      }

      if (has_valid_writes && !read_only) {
        // we can do this because this is called before client has access to the
        // DB and there is only a single thread operating on DB
        ColumnFamilyData* cfd;

        while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
          cfd->UnrefAndTryDelete();
          // If this asserts, it means that InsertInto failed in
          // filtering updates to already-flushed column families
          assert(cfd->GetLogNumber() <= wal_number);
          auto iter = version_edits.find(cfd->GetID());
          assert(iter != version_edits.end());
          VersionEdit* edit = &iter->second;
          status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
          if (!status.ok()) {
            // Reflect errors immediately so that conditions like full
            // file-systems cause the DB::Open() to fail.
            return status;
          }
          flushed = true;

          cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
                                 *next_sequence);
        }
      }
    }

    if (!status.ok()) {
      if (status.IsNotSupported()) {
        // We should not treat NotSupported as corruption. It is rather a clear
        // sign that we are processing a WAL that is produced by an incompatible
        // version of the code.
        return status;
      }
      if (immutable_db_options_.wal_recovery_mode ==
          WALRecoveryMode::kSkipAnyCorruptedRecords) {
        // We should ignore all errors unconditionally
        status = Status::OK();
      } else if (immutable_db_options_.wal_recovery_mode ==
                 WALRecoveryMode::kPointInTimeRecovery) {
        if (status.IsIOError()) {
          ROCKS_LOG_ERROR(immutable_db_options_.info_log,
                          "IOError during point-in-time reading log #%" PRIu64
                          " seq #%" PRIu64
                          ". %s. This likely mean loss of synced WAL, "
                          "thus recovery fails.",
                          wal_number, *next_sequence,
                          status.ToString().c_str());
          return status;
        }
        // We should ignore the error but not continue replaying
        status = Status::OK();
        stop_replay_for_corruption = true;
        corrupted_wal_number = wal_number;
        if (corrupted_wal_found != nullptr) {
          *corrupted_wal_found = true;
        }
        ROCKS_LOG_INFO(immutable_db_options_.info_log,
                       "Point in time recovered to log #%" PRIu64
                       " seq #%" PRIu64,
                       wal_number, *next_sequence);
      } else {
        assert(immutable_db_options_.wal_recovery_mode ==
                   WALRecoveryMode::kTolerateCorruptedTailRecords ||
               immutable_db_options_.wal_recovery_mode ==
                   WALRecoveryMode::kAbsoluteConsistency);
        return status;
      }
    }

    flush_scheduler_.Clear();
    trim_history_scheduler_.Clear();
    auto last_sequence = *next_sequence - 1;
    if ((*next_sequence != kMaxSequenceNumber) &&
        (versions_->LastSequence() <= last_sequence)) {
      versions_->SetLastAllocatedSequence(last_sequence);
      versions_->SetLastPublishedSequence(last_sequence);
      versions_->SetLastSequence(last_sequence);
    }
  }
  // Compare the corrupted log number to all columnfamily's current log number.
  // Abort Open() if any column family's log number is greater than
  // the corrupted log number, which means CF contains data beyond the point of
  // corruption. This could during PIT recovery when the WAL is corrupted and
  // some (but not all) CFs are flushed
  // Exclude the PIT case where no log is dropped after the corruption point.
  // This is to cover the case for empty wals after corrupted log, in which we
  // don't reset stop_replay_for_corruption.
  if (stop_replay_for_corruption == true &&
      (immutable_db_options_.wal_recovery_mode ==
           WALRecoveryMode::kPointInTimeRecovery ||
       immutable_db_options_.wal_recovery_mode ==
           WALRecoveryMode::kTolerateCorruptedTailRecords)) {
    for (auto cfd : *versions_->GetColumnFamilySet()) {
      // One special case cause cfd->GetLogNumber() > corrupted_wal_number but
      // the CF is still consistent: If a new column family is created during
      // the flush and the WAL sync fails at the same time, the new CF points to
      // the new WAL but the old WAL is curropted. Since the new CF is empty, it
      // is still consistent. We add the check of CF sst file size to avoid the
      // false positive alert.

      // Note that, the check of (cfd->GetLiveSstFilesSize() > 0) may leads to
      // the ignorance of a very rare inconsistency case caused in data
      // canclation. One CF is empty due to KV deletion. But those operations
      // are in the WAL. If the WAL is corrupted, the status of this CF might
      // not be consistent with others. However, the consistency check will be
      // bypassed due to empty CF.
      // TODO: a better and complete implementation is needed to ensure strict
      // consistency check in WAL recovery including hanlding the tailing
      // issues.
      if (cfd->GetLogNumber() > corrupted_wal_number &&
          cfd->GetLiveSstFilesSize() > 0) {
        ROCKS_LOG_ERROR(immutable_db_options_.info_log,
                        "Column family inconsistency: SST file contains data"
                        " beyond the point of corruption.");
        return Status::Corruption("SST file is ahead of WALs in CF " +
                                  cfd->GetName());
      }
    }
  }

  // True if there's any data in the WALs; if not, we can skip re-processing
  // them later
  bool data_seen = false;
  if (!read_only) {
    // no need to refcount since client still doesn't have access
    // to the DB and can not drop column families while we iterate
    const WalNumber max_wal_number = wal_numbers.back();
    for (auto cfd : *versions_->GetColumnFamilySet()) {
      auto iter = version_edits.find(cfd->GetID());
      assert(iter != version_edits.end());
      VersionEdit* edit = &iter->second;

      if (cfd->GetLogNumber() > max_wal_number) {
        // Column family cfd has already flushed the data
        // from all wals. Memtable has to be empty because
        // we filter the updates based on wal_number
        // (in WriteBatch::InsertInto)
        assert(cfd->mem()->GetFirstSequenceNumber() == 0);
        assert(edit->NumEntries() == 0);
        continue;
      }

      TEST_SYNC_POINT_CALLBACK(
          "DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable", /*arg=*/nullptr);

      // flush the final memtable (if non-empty)
      if (cfd->mem()->GetFirstSequenceNumber() != 0) {
        // If flush happened in the middle of recovery (e.g. due to memtable
        // being full), we flush at the end. Otherwise we'll need to record
        // where we were on last flush, which make the logic complicated.
        if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
          status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
          if (!status.ok()) {
            // Recovery failed
            break;
          }
          flushed = true;

          cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
                                 versions_->LastSequence());
        }
        data_seen = true;
      }

      // Update the log number info in the version edit corresponding to this
      // column family. Note that the version edits will be written to MANIFEST
      // together later.
      // writing wal_number in the manifest means that any log file
      // with number strongly less than (wal_number + 1) is already
      // recovered and should be ignored on next reincarnation.
      // Since we already recovered max_wal_number, we want all wals
      // with numbers `<= max_wal_number` (includes this one) to be ignored
      if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
        edit->SetLogNumber(max_wal_number + 1);
      }
    }
    if (status.ok()) {
      // we must mark the next log number as used, even though it's
      // not actually used. that is because VersionSet assumes
      // VersionSet::next_file_number_ always to be strictly greater than any
      // log number
      versions_->MarkFileNumberUsed(max_wal_number + 1);
      assert(recovery_ctx != nullptr);

      for (auto* cfd : *versions_->GetColumnFamilySet()) {
        auto iter = version_edits.find(cfd->GetID());
        assert(iter != version_edits.end());
        recovery_ctx->UpdateVersionEdits(cfd, iter->second);
      }

      if (flushed || !data_seen) {
        VersionEdit wal_deletion;
        if (immutable_db_options_.track_and_verify_wals_in_manifest) {
          wal_deletion.DeleteWalsBefore(max_wal_number + 1);
        }
        if (!allow_2pc()) {
          // In non-2pc mode, flushing the memtables of the column families
          // means we can advance min_log_number_to_keep.
          wal_deletion.SetMinLogNumberToKeep(max_wal_number + 1);
        }
        assert(versions_->GetColumnFamilySet() != nullptr);
        recovery_ctx->UpdateVersionEdits(
            versions_->GetColumnFamilySet()->GetDefault(), wal_deletion);
      }
    }
  }

  if (status.ok()) {
    if (data_seen && !flushed) {
      status = RestoreAliveLogFiles(wal_numbers);
    } else if (!wal_numbers.empty()) {  // If there's no data in the WAL, or we
                                        // flushed all the data, still
      // truncate the log file. If the process goes into a crash loop before
      // the file is deleted, the preallocated space will never get freed.
      const bool truncate = !read_only;
      GetLogSizeAndMaybeTruncate(wal_numbers.back(), truncate, nullptr)
          .PermitUncheckedError();
    }
  }

  event_logger_.Log() << "job" << job_id << "event"
                      << "recovery_finished";

  return status;
}

Status DBImpl::GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate,
                                          LogFileNumberSize* log_ptr) {
  LogFileNumberSize log(wal_number);
  std::string fname =
      LogFileName(immutable_db_options_.GetWalDir(), wal_number);
  Status s;
  // This gets the appear size of the wals, not including preallocated space.
  s = env_->GetFileSize(fname, &log.size);
  TEST_SYNC_POINT_CALLBACK("DBImpl::GetLogSizeAndMaybeTruncate:0", /*arg=*/&s);
  if (s.ok() && truncate) {
    std::unique_ptr<FSWritableFile> last_log;
    Status truncate_status = fs_->ReopenWritableFile(
        fname,
        fs_->OptimizeForLogWrite(
            file_options_,
            BuildDBOptions(immutable_db_options_, mutable_db_options_)),
        &last_log, nullptr);
    if (truncate_status.ok()) {
      truncate_status = last_log->Truncate(log.size, IOOptions(), nullptr);
    }
    if (truncate_status.ok()) {
      truncate_status = last_log->Close(IOOptions(), nullptr);
    }
    // Not a critical error if fail to truncate.
    if (!truncate_status.ok() && !truncate_status.IsNotSupported()) {
      ROCKS_LOG_WARN(immutable_db_options_.info_log,
                     "Failed to truncate log #%" PRIu64 ": %s", wal_number,
                     truncate_status.ToString().c_str());
    }
  }
  if (log_ptr) {
    *log_ptr = log;
  }
  return s;
}

Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
  if (wal_numbers.empty()) {
    return Status::OK();
  }
  Status s;
  mutex_.AssertHeld();
  assert(immutable_db_options_.avoid_flush_during_recovery);
  // Mark these as alive so they'll be considered for deletion later by
  // FindObsoleteFiles()
  total_log_size_ = 0;
  log_empty_ = false;
  uint64_t min_wal_with_unflushed_data =
      versions_->MinLogNumberWithUnflushedData();
  for (auto wal_number : wal_numbers) {
    if (!allow_2pc() && wal_number < min_wal_with_unflushed_data) {
      // In non-2pc mode, the WAL files not backing unflushed data are not
      // alive, thus should not be added to the alive_log_files_.
      continue;
    }
    // We preallocate space for wals, but then after a crash and restart, those
    // preallocated space are not needed anymore. It is likely only the last
    // log has such preallocated space, so we only truncate for the last log.
    LogFileNumberSize log;
    s = GetLogSizeAndMaybeTruncate(
        wal_number, /*truncate=*/(wal_number == wal_numbers.back()), &log);
    if (!s.ok()) {
      break;
    }
    total_log_size_ += log.size;
    alive_log_files_.push_back(log);
  }
  return s;
}

Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
                                           MemTable* mem, VersionEdit* edit) {
  mutex_.AssertHeld();
  assert(cfd);
  assert(cfd->imm());
  // The immutable memtable list must be empty.
  assert(std::numeric_limits<uint64_t>::max() ==
         cfd->imm()->GetEarliestMemTableID());

  const uint64_t start_micros = immutable_db_options_.clock->NowMicros();

  FileMetaData meta;
  std::vector<BlobFileAddition> blob_file_additions;

  std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
      new std::list<uint64_t>::iterator(
          CaptureCurrentFileNumberInPendingOutputs()));
  meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
  ReadOptions ro;
  ro.total_order_seek = true;
  ro.io_activity = Env::IOActivity::kDBOpen;
  Arena arena;
  Status s;
  TableProperties table_properties;
  {
    ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
    ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
                    "[%s] [WriteLevel0TableForRecovery]"
                    " Level-0 table #%" PRIu64 ": started",
                    cfd->GetName().c_str(), meta.fd.GetNumber());

    // Get the latest mutable cf options while the mutex is still locked
    const MutableCFOptions mutable_cf_options =
        *cfd->GetLatestMutableCFOptions();
    bool paranoid_file_checks =
        cfd->GetLatestMutableCFOptions()->paranoid_file_checks;

    int64_t _current_time = 0;
    immutable_db_options_.clock->GetCurrentTime(&_current_time)
        .PermitUncheckedError();  // ignore error
    const uint64_t current_time = static_cast<uint64_t>(_current_time);
    meta.oldest_ancester_time = current_time;
    meta.epoch_number = cfd->NewEpochNumber();
    {
      auto write_hint = cfd->CalculateSSTWriteHint(0);
      mutex_.Unlock();

      SequenceNumber earliest_write_conflict_snapshot;
      std::vector<SequenceNumber> snapshot_seqs =
          snapshots_.GetAll(&earliest_write_conflict_snapshot);
      auto snapshot_checker = snapshot_checker_.get();
      if (use_custom_gc_ && snapshot_checker == nullptr) {
        snapshot_checker = DisableGCSnapshotChecker::Instance();
      }
      std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
          range_del_iters;
      auto range_del_iter =
          // This is called during recovery, where a live memtable is flushed
          // directly. In this case, no fragmented tombstone list is cached in
          // this memtable yet.
          mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber,
                                         false /* immutable_memtable */);
      if (range_del_iter != nullptr) {
        range_del_iters.emplace_back(range_del_iter);
      }

      IOStatus io_s;
      TableBuilderOptions tboptions(
          *cfd->ioptions(), mutable_cf_options, cfd->internal_comparator(),
          cfd->int_tbl_prop_collector_factories(),
          GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
          mutable_cf_options.compression_opts, cfd->GetID(), cfd->GetName(),
          0 /* level */, false /* is_bottommost */,
          TableFileCreationReason::kRecovery, 0 /* oldest_key_time */,
          0 /* file_creation_time */, db_id_, db_session_id_,
          0 /* target_file_size */, meta.fd.GetNumber());
      SeqnoToTimeMapping empty_seqno_time_mapping;
      Version* version = cfd->current();
      version->Ref();
      const ReadOptions read_option(Env::IOActivity::kDBOpen);
      uint64_t num_input_entries = 0;
      s = BuildTable(
          dbname_, versions_.get(), immutable_db_options_, tboptions,
          file_options_for_compaction_, read_option, cfd->table_cache(),
          iter.get(), std::move(range_del_iters), &meta, &blob_file_additions,
          snapshot_seqs, earliest_write_conflict_snapshot, kMaxSequenceNumber,
          snapshot_checker, paranoid_file_checks, cfd->internal_stats(), &io_s,
          io_tracer_, BlobFileCreationReason::kRecovery,
          empty_seqno_time_mapping, &event_logger_, job_id, Env::IO_HIGH,
          nullptr /* table_properties */, write_hint,
          nullptr /*full_history_ts_low*/, &blob_callback_, version,
          &num_input_entries);
      version->Unref();
      LogFlush(immutable_db_options_.info_log);
      ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
                      "[%s] [WriteLevel0TableForRecovery]"
                      " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
                      cfd->GetName().c_str(), meta.fd.GetNumber(),
                      meta.fd.GetFileSize(), s.ToString().c_str());
      mutex_.Lock();

      // TODO(AR) is this ok?
      if (!io_s.ok() && s.ok()) {
        s = io_s;
      }

      uint64_t total_num_entries = mem->num_entries();
      if (s.ok() && total_num_entries != num_input_entries) {
        std::string msg = "Expected " + std::to_string(total_num_entries) +
                          " entries in memtable, but read " +
                          std::to_string(num_input_entries);
        ROCKS_LOG_WARN(immutable_db_options_.info_log,
                       "[%s] [JOB %d] Level-0 flush during recover: %s",
                       cfd->GetName().c_str(), job_id, msg.c_str());
        if (immutable_db_options_.flush_verify_memtable_count) {
          s = Status::Corruption(msg);
        }
      }
    }
  }
  ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);

  // Note that if file_size is zero, the file has been deleted and
  // should not be added to the manifest.
  const bool has_output = meta.fd.GetFileSize() > 0;

  constexpr int level = 0;

  if (s.ok() && has_output) {
    edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
                  meta.fd.GetFileSize(), meta.smallest, meta.largest,
                  meta.fd.smallest_seqno, meta.fd.largest_seqno,
                  meta.marked_for_compaction, meta.temperature,
                  meta.oldest_blob_file_number, meta.oldest_ancester_time,
                  meta.file_creation_time, meta.epoch_number,
                  meta.file_checksum, meta.file_checksum_func_name,
                  meta.unique_id, meta.compensated_range_deletion_size,
                  meta.tail_size, meta.user_defined_timestamps_persisted);

    for (const auto& blob : blob_file_additions) {
      edit->AddBlobFile(blob);
    }
  }

  InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
  stats.micros = immutable_db_options_.clock->NowMicros() - start_micros;

  if (has_output) {
    stats.bytes_written = meta.fd.GetFileSize();
    stats.num_output_files = 1;
  }

  const auto& blobs = edit->GetBlobFileAdditions();
  for (const auto& blob : blobs) {
    stats.bytes_written_blob += blob.GetTotalBlobBytes();
  }

  stats.num_output_files_blob = static_cast<int>(blobs.size());

  cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
  cfd->internal_stats()->AddCFStats(
      InternalStats::BYTES_FLUSHED,
      stats.bytes_written + stats.bytes_written_blob);
  RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
  return s;
}

Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
  DBOptions db_options(options);
  ColumnFamilyOptions cf_options(options);
  std::vector<ColumnFamilyDescriptor> column_families;
  column_families.push_back(
      ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
  if (db_options.persist_stats_to_disk) {
    column_families.push_back(
        ColumnFamilyDescriptor(kPersistentStatsColumnFamilyName, cf_options));
  }
  std::vector<ColumnFamilyHandle*> handles;
  Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
  if (s.ok()) {
    if (db_options.persist_stats_to_disk) {
      assert(handles.size() == 2);
    } else {
      assert(handles.size() == 1);
    }
    // i can delete the handle since DBImpl is always holding a reference to
    // default column family
    if (db_options.persist_stats_to_disk && handles[1] != nullptr) {
      delete handles[1];
    }
    delete handles[0];
  }
  return s;
}

Status DB::Open(const DBOptions& db_options, const std::string& dbname,
                const std::vector<ColumnFamilyDescriptor>& column_families,
                std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
  const bool kSeqPerBatch = true;
  const bool kBatchPerTxn = true;
  ThreadStatusUtil::SetEnableTracking(db_options.enable_thread_tracking);
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_DBOPEN);
  Status s = DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
                          !kSeqPerBatch, kBatchPerTxn);
  ThreadStatusUtil::ResetThreadStatus();
  return s;
}

// TODO: Implement the trimming in flush code path.
// TODO: Perform trimming before inserting into memtable during recovery.
// TODO: Pick files with max_timestamp > trim_ts by each file's timestamp meta
// info, and handle only these files to reduce io.
Status DB::OpenAndTrimHistory(
    const DBOptions& db_options, const std::string& dbname,
    const std::vector<ColumnFamilyDescriptor>& column_families,
    std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
    std::string trim_ts) {
  assert(dbptr != nullptr);
  assert(handles != nullptr);
  auto validate_options = [&db_options] {
    if (db_options.avoid_flush_during_recovery) {
      return Status::InvalidArgument(
          "avoid_flush_during_recovery incompatible with "
          "OpenAndTrimHistory");
    }
    return Status::OK();
  };
  auto s = validate_options();
  if (!s.ok()) {
    return s;
  }

  DB* db = nullptr;
  s = DB::Open(db_options, dbname, column_families, handles, &db);
  if (!s.ok()) {
    return s;
  }
  assert(db);
  CompactRangeOptions options;
  options.bottommost_level_compaction =
      BottommostLevelCompaction::kForceOptimized;
  auto db_impl = static_cast_with_check<DBImpl>(db);
  for (auto handle : *handles) {
    assert(handle != nullptr);
    auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(handle);
    auto cfd = cfh->cfd();
    assert(cfd != nullptr);
    // Only compact column families with timestamp enabled
    if (cfd->user_comparator() != nullptr &&
        cfd->user_comparator()->timestamp_size() > 0) {
      s = db_impl->CompactRangeInternal(options, handle, nullptr, nullptr,
                                        trim_ts);
      if (!s.ok()) {
        break;
      }
    }
  }
  auto clean_op = [&handles, &db] {
    for (auto handle : *handles) {
      auto temp_s = db->DestroyColumnFamilyHandle(handle);
      assert(temp_s.ok());
    }
    handles->clear();
    delete db;
  };
  if (!s.ok()) {
    clean_op();
    return s;
  }

  *dbptr = db;
  return s;
}

IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
                           size_t preallocate_block_size,
                           log::Writer** new_log) {
  IOStatus io_s;
  std::unique_ptr<FSWritableFile> lfile;

  DBOptions db_options =
      BuildDBOptions(immutable_db_options_, mutable_db_options_);
  FileOptions opt_file_options =
      fs_->OptimizeForLogWrite(file_options_, db_options);
  std::string wal_dir = immutable_db_options_.GetWalDir();
  std::string log_fname = LogFileName(wal_dir, log_file_num);

  if (recycle_log_number) {
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
                   "reusing log %" PRIu64 " from recycle list\n",
                   recycle_log_number);
    std::string old_log_fname = LogFileName(wal_dir, recycle_log_number);
    TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
    TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
    io_s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
                                  &lfile, /*dbg=*/nullptr);
  } else {
    io_s = NewWritableFile(fs_.get(), log_fname, &lfile, opt_file_options);
  }

  if (io_s.ok()) {
    lfile->SetWriteLifeTimeHint(CalculateWALWriteHint());
    lfile->SetPreallocationBlockSize(preallocate_block_size);

    const auto& listeners = immutable_db_options_.listeners;
    FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
    std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
        std::move(lfile), log_fname, opt_file_options,
        immutable_db_options_.clock, io_tracer_, nullptr /* stats */, listeners,
        nullptr, tmp_set.Contains(FileType::kWalFile),
        tmp_set.Contains(FileType::kWalFile)));
    *new_log = new log::Writer(std::move(file_writer), log_file_num,
                               immutable_db_options_.recycle_log_file_num > 0,
                               immutable_db_options_.manual_wal_flush,
                               immutable_db_options_.wal_compression);
    io_s = (*new_log)->AddCompressionTypeRecord();
  }
  return io_s;
}

Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
                    const std::vector<ColumnFamilyDescriptor>& column_families,
                    std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
                    const bool seq_per_batch, const bool batch_per_txn) {
  Status s = ValidateOptionsByTable(db_options, column_families);
  if (!s.ok()) {
    return s;
  }

  s = ValidateOptions(db_options, column_families);
  if (!s.ok()) {
    return s;
  }

  *dbptr = nullptr;
  assert(handles);
  handles->clear();

  size_t max_write_buffer_size = 0;
  for (auto cf : column_families) {
    max_write_buffer_size =
        std::max(max_write_buffer_size, cf.options.write_buffer_size);
  }

  DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn);
  if (!impl->immutable_db_options_.info_log) {
    s = impl->init_logger_creation_s_;
    delete impl;
    return s;
  } else {
    assert(impl->init_logger_creation_s_.ok());
  }
  s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.GetWalDir());
  if (s.ok()) {
    std::vector<std::string> paths;
    for (auto& db_path : impl->immutable_db_options_.db_paths) {
      paths.emplace_back(db_path.path);
    }
    for (auto& cf : column_families) {
      for (auto& cf_path : cf.options.cf_paths) {
        paths.emplace_back(cf_path.path);
      }
    }
    for (auto& path : paths) {
      s = impl->env_->CreateDirIfMissing(path);
      if (!s.ok()) {
        break;
      }
    }

    // For recovery from NoSpace() error, we can only handle
    // the case where the database is stored in a single path
    if (paths.size() <= 1) {
      impl->error_handler_.EnableAutoRecovery();
    }
  }
  if (s.ok()) {
    s = impl->CreateArchivalDirectory();
  }
  if (!s.ok()) {
    delete impl;
    return s;
  }

  impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
  RecoveryContext recovery_ctx;
  impl->mutex_.Lock();

  // Handles create_if_missing, error_if_exists
  uint64_t recovered_seq(kMaxSequenceNumber);
  s = impl->Recover(column_families, false /* read_only */,
                    false /* error_if_wal_file_exists */,
                    false /* error_if_data_exists_in_wals */, &recovered_seq,
                    &recovery_ctx);
  if (s.ok()) {
    uint64_t new_log_number = impl->versions_->NewFileNumber();
    log::Writer* new_log = nullptr;
    const size_t preallocate_block_size =
        impl->GetWalPreallocateBlockSize(max_write_buffer_size);
    s = impl->CreateWAL(new_log_number, 0 /*recycle_log_number*/,
                        preallocate_block_size, &new_log);
    if (s.ok()) {
      InstrumentedMutexLock wl(&impl->log_write_mutex_);
      impl->logfile_number_ = new_log_number;
      assert(new_log != nullptr);
      assert(impl->logs_.empty());
      impl->logs_.emplace_back(new_log_number, new_log);
    }

    if (s.ok()) {
      impl->alive_log_files_.push_back(
          DBImpl::LogFileNumberSize(impl->logfile_number_));
      // In WritePrepared there could be gap in sequence numbers. This breaks
      // the trick we use in kPointInTimeRecovery which assumes the first seq in
      // the log right after the corrupted log is one larger than the last seq
      // we read from the wals. To let this trick keep working, we add a dummy
      // entry with the expected sequence to the first log right after recovery.
      // In non-WritePrepared case also the new log after recovery could be
      // empty, and thus missing the consecutive seq hint to distinguish
      // middle-log corruption to corrupted-log-remained-after-recovery. This
      // case also will be addressed by a dummy write.
      if (recovered_seq != kMaxSequenceNumber) {
        WriteBatch empty_batch;
        WriteBatchInternal::SetSequence(&empty_batch, recovered_seq);
        WriteOptions write_options;
        uint64_t log_used, log_size;
        log::Writer* log_writer = impl->logs_.back().writer;
        LogFileNumberSize& log_file_number_size = impl->alive_log_files_.back();

        assert(log_writer->get_log_number() == log_file_number_size.number);
        impl->mutex_.AssertHeld();
        s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size,
                             Env::IO_TOTAL, log_file_number_size);
        if (s.ok()) {
          // Need to fsync, otherwise it might get lost after a power reset.
          s = impl->FlushWAL(false);
          TEST_SYNC_POINT_CALLBACK("DBImpl::Open::BeforeSyncWAL", /*arg=*/&s);
          if (s.ok()) {
            s = log_writer->file()->Sync(impl->immutable_db_options_.use_fsync);
          }
        }
      }
    }
  }
  if (s.ok()) {
    s = impl->LogAndApplyForRecovery(recovery_ctx);
  }

  if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
    impl->mutex_.AssertHeld();
    s = impl->InitPersistStatsColumnFamily();
  }

  if (s.ok()) {
    // set column family handles
    for (auto cf : column_families) {
      auto cfd =
          impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
      if (cfd != nullptr) {
        handles->push_back(
            new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
        impl->NewThreadStatusCfInfo(cfd);
      } else {
        if (db_options.create_missing_column_families) {
          // missing column family, create it
          ColumnFamilyHandle* handle = nullptr;
          impl->mutex_.Unlock();
          s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
          impl->mutex_.Lock();
          if (s.ok()) {
            handles->push_back(handle);
          } else {
            break;
          }
        } else {
          s = Status::InvalidArgument("Column family not found", cf.name);
          break;
        }
      }
    }
  }

  if (s.ok()) {
    SuperVersionContext sv_context(/* create_superversion */ true);
    for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
      impl->InstallSuperVersionAndScheduleWork(
          cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
    }
    sv_context.Clean();
  }

  if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
    // try to read format version
    s = impl->PersistentStatsProcessFormatVersion();
  }

  if (s.ok()) {
    for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
      if (!cfd->mem()->IsSnapshotSupported()) {
        impl->is_snapshot_supported_ = false;
      }
      if (cfd->ioptions()->merge_operator != nullptr &&
          !cfd->mem()->IsMergeOperatorSupported()) {
        s = Status::InvalidArgument(
            "The memtable of column family %s does not support merge operator "
            "its options.merge_operator is non-null",
            cfd->GetName().c_str());
      }
      if (!s.ok()) {
        break;
      }
    }
  }
  TEST_SYNC_POINT("DBImpl::Open:Opened");
  Status persist_options_status;
  if (s.ok()) {
    // Persist RocksDB Options before scheduling the compaction.
    // The WriteOptionsFile() will release and lock the mutex internally.
    persist_options_status = impl->WriteOptionsFile(
        false /*need_mutex_lock*/, false /*need_enter_write_thread*/);

    *dbptr = impl;
    impl->opened_successfully_ = true;
    impl->DeleteObsoleteFiles();
    TEST_SYNC_POINT("DBImpl::Open:AfterDeleteFiles");
    impl->MaybeScheduleFlushOrCompaction();
  } else {
    persist_options_status.PermitUncheckedError();
  }
  impl->mutex_.Unlock();

  auto sfm = static_cast<SstFileManagerImpl*>(
      impl->immutable_db_options_.sst_file_manager.get());
  if (s.ok() && sfm) {
    // Set Statistics ptr for SstFileManager to dump the stats of
    // DeleteScheduler.
    sfm->SetStatisticsPtr(impl->immutable_db_options_.statistics);
    ROCKS_LOG_INFO(impl->immutable_db_options_.info_log,
                   "SstFileManager instance %p", sfm);

    // Notify SstFileManager about all sst files that already exist in
    // db_paths[0] and cf_paths[0] when the DB is opened.

    // SstFileManagerImpl needs to know sizes of the files. For files whose size
    // we already know (sst files that appear in manifest - typically that's the
    // vast majority of all files), we'll pass the size to SstFileManager.
    // For all other files SstFileManager will query the size from filesystem.

    std::vector<ColumnFamilyMetaData> metadata;
    impl->GetAllColumnFamilyMetaData(&metadata);

    std::unordered_map<std::string, uint64_t> known_file_sizes;
    for (const auto& md : metadata) {
      for (const auto& lmd : md.levels) {
        for (const auto& fmd : lmd.files) {
          known_file_sizes[fmd.relative_filename] = fmd.size;
        }
      }
      for (const auto& bmd : md.blob_files) {
        std::string name = bmd.blob_file_name;
        // The BlobMetaData.blob_file_name may start with "/".
        if (!name.empty() && name[0] == '/') {
          name = name.substr(1);
        }
        known_file_sizes[name] = bmd.blob_file_size;
      }
    }

    std::vector<std::string> paths;
    paths.emplace_back(impl->immutable_db_options_.db_paths[0].path);
    for (auto& cf : column_families) {
      if (!cf.options.cf_paths.empty()) {
        paths.emplace_back(cf.options.cf_paths[0].path);
      }
    }
    // Remove duplicate paths.
    std::sort(paths.begin(), paths.end());
    paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
    IOOptions io_opts;
    io_opts.do_not_recurse = true;
    for (auto& path : paths) {
      std::vector<std::string> existing_files;
      impl->immutable_db_options_.fs
          ->GetChildren(path, io_opts, &existing_files,
                        /*IODebugContext*=*/nullptr)
          .PermitUncheckedError();  //**TODO: What do to on error?
      for (auto& file_name : existing_files) {
        uint64_t file_number;
        FileType file_type;
        std::string file_path = path + "/" + file_name;
        if (ParseFileName(file_name, &file_number, &file_type) &&
            (file_type == kTableFile || file_type == kBlobFile)) {
          // TODO: Check for errors from OnAddFile?
          if (known_file_sizes.count(file_name)) {
            // We're assuming that each sst file name exists in at most one of
            // the paths.
            sfm->OnAddFile(file_path, known_file_sizes.at(file_name))
                .PermitUncheckedError();
          } else {
            sfm->OnAddFile(file_path).PermitUncheckedError();
          }
        }
      }
    }

    // Reserve some disk buffer space. This is a heuristic - when we run out
    // of disk space, this ensures that there is atleast write_buffer_size
    // amount of free space before we resume DB writes. In low disk space
    // conditions, we want to avoid a lot of small L0 files due to frequent
    // WAL write failures and resultant forced flushes
    sfm->ReserveDiskBuffer(max_write_buffer_size,
                           impl->immutable_db_options_.db_paths[0].path);
  }


  if (s.ok()) {
    ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
                     impl);
    LogFlush(impl->immutable_db_options_.info_log);
    if (!impl->WALBufferIsEmpty()) {
      s = impl->FlushWAL(false);
      if (s.ok()) {
        // Sync is needed otherwise WAL buffered data might get lost after a
        // power reset.
        log::Writer* log_writer = impl->logs_.back().writer;
        s = log_writer->file()->Sync(impl->immutable_db_options_.use_fsync);
      }
    }
    if (s.ok() && !persist_options_status.ok()) {
      s = Status::IOError(
          "DB::Open() failed --- Unable to persist Options file",
          persist_options_status.ToString());
    }
  }
  if (!s.ok()) {
    ROCKS_LOG_WARN(impl->immutable_db_options_.info_log,
                   "DB::Open() failed: %s", s.ToString().c_str());
  }
  if (s.ok()) {
    s = impl->StartPeriodicTaskScheduler();
  }

  if (s.ok()) {
    s = impl->RegisterRecordSeqnoTimeWorker();
  }
  if (!s.ok()) {
    for (auto* h : *handles) {
      delete h;
    }
    handles->clear();
    delete impl;
    *dbptr = nullptr;
  }
  return s;
}
}  // namespace ROCKSDB_NAMESPACE