//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#ifdef GFLAGS
#include "db_stress_tool/db_stress_common.h"
#include "file/file_util.h"

namespace ROCKSDB_NAMESPACE {
class CfConsistencyStressTest : public StressTest {
 public:
  CfConsistencyStressTest() : batch_id_(0) {}

  ~CfConsistencyStressTest() override {}

  bool IsStateTracked() const override { return false; }

  Status TestPut(ThreadState* thread, WriteOptions& write_opts,
                 const ReadOptions& /* read_opts */,
                 const std::vector<int>& rand_column_families,
                 const std::vector<int64_t>& rand_keys,
                 char (&value)[100]) override {
    assert(!rand_column_families.empty());
    assert(!rand_keys.empty());

    const std::string k = Key(rand_keys[0]);

    const uint32_t value_base = batch_id_.fetch_add(1);
    const size_t sz = GenerateValue(value_base, value, sizeof(value));
    const Slice v(value, sz);

    WriteBatch batch;

    const bool use_put_entity = !FLAGS_use_merge &&
                                FLAGS_use_put_entity_one_in > 0 &&
                                (value_base % FLAGS_use_put_entity_one_in) == 0;

    for (auto cf : rand_column_families) {
      ColumnFamilyHandle* const cfh = column_families_[cf];
      assert(cfh);

      if (FLAGS_use_merge) {
        batch.Merge(cfh, k, v);
      } else if (use_put_entity) {
        batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v));
      } else {
        batch.Put(cfh, k, v);
      }
    }

    Status s = db_->Write(write_opts, &batch);

    if (!s.ok()) {
      fprintf(stderr, "multi put or merge error: %s\n", s.ToString().c_str());
      thread->stats.AddErrors(1);
    } else {
      auto num = static_cast<long>(rand_column_families.size());
      thread->stats.AddBytesForWrites(num, (sz + 1) * num);
    }

    return s;
  }

  Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
                    const std::vector<int>& rand_column_families,
                    const std::vector<int64_t>& rand_keys) override {
    std::string key_str = Key(rand_keys[0]);
    Slice key = key_str;
    WriteBatch batch;
    for (auto cf : rand_column_families) {
      ColumnFamilyHandle* cfh = column_families_[cf];
      batch.Delete(cfh, key);
    }
    Status s = db_->Write(write_opts, &batch);
    if (!s.ok()) {
      fprintf(stderr, "multidel error: %s\n", s.ToString().c_str());
      thread->stats.AddErrors(1);
    } else {
      thread->stats.AddDeletes(static_cast<long>(rand_column_families.size()));
    }
    return s;
  }

  Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
                         const std::vector<int>& rand_column_families,
                         const std::vector<int64_t>& rand_keys) override {
    int64_t rand_key = rand_keys[0];
    auto shared = thread->shared;
    int64_t max_key = shared->GetMaxKey();
    if (rand_key > max_key - FLAGS_range_deletion_width) {
      rand_key =
          thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
    }
    std::string key_str = Key(rand_key);
    Slice key = key_str;
    std::string end_key_str = Key(rand_key + FLAGS_range_deletion_width);
    Slice end_key = end_key_str;
    WriteBatch batch;
    for (auto cf : rand_column_families) {
      ColumnFamilyHandle* cfh = column_families_[rand_column_families[cf]];
      batch.DeleteRange(cfh, key, end_key);
    }
    Status s = db_->Write(write_opts, &batch);
    if (!s.ok()) {
      fprintf(stderr, "multi del range error: %s\n", s.ToString().c_str());
      thread->stats.AddErrors(1);
    } else {
      thread->stats.AddRangeDeletions(
          static_cast<long>(rand_column_families.size()));
    }
    return s;
  }

  void TestIngestExternalFile(
      ThreadState* /* thread */,
      const std::vector<int>& /* rand_column_families */,
      const std::vector<int64_t>& /* rand_keys */) override {
    assert(false);
    fprintf(stderr,
            "CfConsistencyStressTest does not support TestIngestExternalFile "
            "because it's not possible to verify the result\n");
    std::terminate();
  }

  Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
                 const std::vector<int>& rand_column_families,
                 const std::vector<int64_t>& rand_keys) override {
    std::string key_str = Key(rand_keys[0]);
    Slice key = key_str;
    Status s;
    bool is_consistent = true;

    if (thread->rand.OneIn(2)) {
      // 1/2 chance, does a random read from random CF
      auto cfh =
          column_families_[rand_column_families[thread->rand.Next() %
                                                rand_column_families.size()]];
      std::string from_db;
      s = db_->Get(readoptions, cfh, key, &from_db);
    } else {
      // 1/2 chance, comparing one key is the same across all CFs
      const Snapshot* snapshot = db_->GetSnapshot();
      ReadOptions readoptionscopy = readoptions;
      readoptionscopy.snapshot = snapshot;

      std::string value0;
      s = db_->Get(readoptionscopy, column_families_[rand_column_families[0]],
                   key, &value0);
      if (s.ok() || s.IsNotFound()) {
        bool found = s.ok();
        for (size_t i = 1; i < rand_column_families.size(); i++) {
          std::string value1;
          s = db_->Get(readoptionscopy,
                       column_families_[rand_column_families[i]], key, &value1);
          if (!s.ok() && !s.IsNotFound()) {
            break;
          }
          if (!found && s.ok()) {
            fprintf(stderr, "Get() return different results with key %s\n",
                    Slice(key_str).ToString(true).c_str());
            fprintf(stderr, "CF %s is not found\n",
                    column_family_names_[0].c_str());
            fprintf(stderr, "CF %s returns value %s\n",
                    column_family_names_[i].c_str(),
                    Slice(value1).ToString(true).c_str());
            is_consistent = false;
          } else if (found && s.IsNotFound()) {
            fprintf(stderr, "Get() return different results with key %s\n",
                    Slice(key_str).ToString(true).c_str());
            fprintf(stderr, "CF %s returns value %s\n",
                    column_family_names_[0].c_str(),
                    Slice(value0).ToString(true).c_str());
            fprintf(stderr, "CF %s is not found\n",
                    column_family_names_[i].c_str());
            is_consistent = false;
          } else if (s.ok() && value0 != value1) {
            fprintf(stderr, "Get() return different results with key %s\n",
                    Slice(key_str).ToString(true).c_str());
            fprintf(stderr, "CF %s returns value %s\n",
                    column_family_names_[0].c_str(),
                    Slice(value0).ToString(true).c_str());
            fprintf(stderr, "CF %s returns value %s\n",
                    column_family_names_[i].c_str(),
                    Slice(value1).ToString(true).c_str());
            is_consistent = false;
          }
          if (!is_consistent) {
            break;
          }
        }
      }

      db_->ReleaseSnapshot(snapshot);
    }
    if (!is_consistent) {
      fprintf(stderr, "TestGet error: is_consistent is false\n");
      thread->stats.AddErrors(1);
      // Fail fast to preserve the DB state.
      thread->shared->SetVerificationFailure();
    } else if (s.ok()) {
      thread->stats.AddGets(1, 1);
    } else if (s.IsNotFound()) {
      thread->stats.AddGets(1, 0);
    } else {
      fprintf(stderr, "TestGet error: %s\n", s.ToString().c_str());
      thread->stats.AddErrors(1);
    }
    return s;
  }

  std::vector<Status> TestMultiGet(
      ThreadState* thread, const ReadOptions& read_opts,
      const std::vector<int>& rand_column_families,
      const std::vector<int64_t>& rand_keys) override {
    size_t num_keys = rand_keys.size();
    std::vector<std::string> key_str;
    std::vector<Slice> keys;
    keys.reserve(num_keys);
    key_str.reserve(num_keys);
    std::vector<PinnableSlice> values(num_keys);
    std::vector<Status> statuses(num_keys);
    ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
    ReadOptions readoptionscopy = read_opts;
    readoptionscopy.rate_limiter_priority =
        FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;

    for (size_t i = 0; i < num_keys; ++i) {
      key_str.emplace_back(Key(rand_keys[i]));
      keys.emplace_back(key_str.back());
    }
    db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
                  statuses.data());
    for (auto s : statuses) {
      if (s.ok()) {
        // found case
        thread->stats.AddGets(1, 1);
      } else if (s.IsNotFound()) {
        // not found case
        thread->stats.AddGets(1, 0);
      } else {
        // errors case
        fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
        thread->stats.AddErrors(1);
      }
    }
    return statuses;
  }

  void TestGetEntity(ThreadState* thread, const ReadOptions& read_opts,
                     const std::vector<int>& rand_column_families,
                     const std::vector<int64_t>& rand_keys) override {
    assert(thread);
    assert(!rand_column_families.empty());
    assert(!rand_keys.empty());

    const std::string key = Key(rand_keys[0]);

    Status s;
    bool is_consistent = true;

    if (thread->rand.OneIn(2)) {
      // With a 1/2 chance, do a random read from a random CF
      const size_t cf_id = thread->rand.Next() % rand_column_families.size();

      assert(rand_column_families[cf_id] >= 0);
      assert(rand_column_families[cf_id] <
             static_cast<int>(column_families_.size()));

      ColumnFamilyHandle* const cfh =
          column_families_[rand_column_families[cf_id]];
      assert(cfh);

      PinnableWideColumns result;
      s = db_->GetEntity(read_opts, cfh, key, &result);

      if (s.ok()) {
        if (!VerifyWideColumns(result.columns())) {
          fprintf(
              stderr,
              "GetEntity error: inconsistent columns for key %s, entity %s\n",
              StringToHex(key).c_str(),
              WideColumnsToHex(result.columns()).c_str());
          is_consistent = false;
        }
      }
    } else {
      // With a 1/2 chance, compare one key across all CFs
      ManagedSnapshot snapshot_guard(db_);

      ReadOptions read_opts_copy = read_opts;
      read_opts_copy.snapshot = snapshot_guard.snapshot();

      assert(rand_column_families[0] >= 0);
      assert(rand_column_families[0] <
             static_cast<int>(column_families_.size()));

      PinnableWideColumns cmp_result;
      s = db_->GetEntity(read_opts_copy,
                         column_families_[rand_column_families[0]], key,
                         &cmp_result);

      if (s.ok() || s.IsNotFound()) {
        const bool cmp_found = s.ok();

        if (cmp_found) {
          if (!VerifyWideColumns(cmp_result.columns())) {
            fprintf(stderr,
                    "GetEntity error: inconsistent columns for key %s, "
                    "entity %s\n",
                    StringToHex(key).c_str(),
                    WideColumnsToHex(cmp_result.columns()).c_str());
            is_consistent = false;
          }
        }

        if (is_consistent) {
          for (size_t i = 1; i < rand_column_families.size(); ++i) {
            assert(rand_column_families[i] >= 0);
            assert(rand_column_families[i] <
                   static_cast<int>(column_families_.size()));

            PinnableWideColumns result;
            s = db_->GetEntity(read_opts_copy,
                               column_families_[rand_column_families[i]], key,
                               &result);

            if (!s.ok() && !s.IsNotFound()) {
              break;
            }

            const bool found = s.ok();

            assert(!column_family_names_.empty());
            assert(i < column_family_names_.size());

            if (!cmp_found && found) {
              fprintf(stderr,
                      "GetEntity returns different results for key %s: CF %s "
                      "returns not found, CF %s returns entity %s\n",
                      StringToHex(key).c_str(), column_family_names_[0].c_str(),
                      column_family_names_[i].c_str(),
                      WideColumnsToHex(result.columns()).c_str());
              is_consistent = false;
              break;
            }

            if (cmp_found && !found) {
              fprintf(stderr,
                      "GetEntity returns different results for key %s: CF %s "
                      "returns entity %s, CF %s returns not found\n",
                      StringToHex(key).c_str(), column_family_names_[0].c_str(),
                      WideColumnsToHex(cmp_result.columns()).c_str(),
                      column_family_names_[i].c_str());
              is_consistent = false;
              break;
            }

            if (found && result != cmp_result) {
              fprintf(stderr,
                      "GetEntity returns different results for key %s: CF %s "
                      "returns entity %s, CF %s returns entity %s\n",
                      StringToHex(key).c_str(), column_family_names_[0].c_str(),
                      WideColumnsToHex(cmp_result.columns()).c_str(),
                      column_family_names_[i].c_str(),
                      WideColumnsToHex(result.columns()).c_str());
              is_consistent = false;
              break;
            }
          }
        }
      }
    }

    if (!is_consistent) {
      fprintf(stderr, "TestGetEntity error: results are not consistent\n");
      thread->stats.AddErrors(1);
      // Fail fast to preserve the DB state.
      thread->shared->SetVerificationFailure();
    } else if (s.ok()) {
      thread->stats.AddGets(1, 1);
    } else if (s.IsNotFound()) {
      thread->stats.AddGets(1, 0);
    } else {
      fprintf(stderr, "TestGetEntity error: %s\n", s.ToString().c_str());
      thread->stats.AddErrors(1);
    }
  }

  void TestMultiGetEntity(ThreadState* thread, const ReadOptions& read_opts,
                          const std::vector<int>& rand_column_families,
                          const std::vector<int64_t>& rand_keys) override {
    assert(thread);
    assert(thread->shared);
    assert(!rand_column_families.empty());
    assert(!rand_keys.empty());

    ManagedSnapshot snapshot_guard(db_);

    ReadOptions read_opts_copy = read_opts;
    read_opts_copy.snapshot = snapshot_guard.snapshot();

    const size_t num_cfs = rand_column_families.size();

    std::vector<ColumnFamilyHandle*> cfhs;
    cfhs.reserve(num_cfs);

    for (size_t j = 0; j < num_cfs; ++j) {
      assert(rand_column_families[j] >= 0);
      assert(rand_column_families[j] <
             static_cast<int>(column_families_.size()));

      ColumnFamilyHandle* const cfh = column_families_[rand_column_families[j]];
      assert(cfh);

      cfhs.emplace_back(cfh);
    }

    const size_t num_keys = rand_keys.size();

    for (size_t i = 0; i < num_keys; ++i) {
      const std::string key = Key(rand_keys[i]);

      std::vector<Slice> key_slices(num_cfs, key);
      std::vector<PinnableWideColumns> results(num_cfs);
      std::vector<Status> statuses(num_cfs);

      db_->MultiGetEntity(read_opts_copy, num_cfs, cfhs.data(),
                          key_slices.data(), results.data(), statuses.data());

      bool is_consistent = true;

      for (size_t j = 0; j < num_cfs; ++j) {
        const Status& s = statuses[j];
        const Status& cmp_s = statuses[0];
        const WideColumns& columns = results[j].columns();
        const WideColumns& cmp_columns = results[0].columns();

        if (!s.ok() && !s.IsNotFound()) {
          fprintf(stderr, "TestMultiGetEntity error: %s\n",
                  s.ToString().c_str());
          thread->stats.AddErrors(1);
          break;
        }

        assert(cmp_s.ok() || cmp_s.IsNotFound());

        if (s.IsNotFound()) {
          if (cmp_s.ok()) {
            fprintf(
                stderr,
                "MultiGetEntity returns different results for key %s: CF %s "
                "returns entity %s, CF %s returns not found\n",
                StringToHex(key).c_str(), column_family_names_[0].c_str(),
                WideColumnsToHex(cmp_columns).c_str(),
                column_family_names_[j].c_str());
            is_consistent = false;
            break;
          }

          continue;
        }

        assert(s.ok());
        if (cmp_s.IsNotFound()) {
          fprintf(stderr,
                  "MultiGetEntity returns different results for key %s: CF %s "
                  "returns not found, CF %s returns entity %s\n",
                  StringToHex(key).c_str(), column_family_names_[0].c_str(),
                  column_family_names_[j].c_str(),
                  WideColumnsToHex(columns).c_str());
          is_consistent = false;
          break;
        }

        if (columns != cmp_columns) {
          fprintf(stderr,
                  "MultiGetEntity returns different results for key %s: CF %s "
                  "returns entity %s, CF %s returns entity %s\n",
                  StringToHex(key).c_str(), column_family_names_[0].c_str(),
                  WideColumnsToHex(cmp_columns).c_str(),
                  column_family_names_[j].c_str(),
                  WideColumnsToHex(columns).c_str());
          is_consistent = false;
          break;
        }

        if (!VerifyWideColumns(columns)) {
          fprintf(stderr,
                  "MultiGetEntity error: inconsistent columns for key %s, "
                  "entity %s\n",
                  StringToHex(key).c_str(), WideColumnsToHex(columns).c_str());
          is_consistent = false;
          break;
        }
      }

      if (!is_consistent) {
        fprintf(stderr,
                "TestMultiGetEntity error: results are not consistent\n");
        thread->stats.AddErrors(1);
        // Fail fast to preserve the DB state.
        thread->shared->SetVerificationFailure();
        break;
      } else if (statuses[0].ok()) {
        thread->stats.AddGets(1, 1);
      } else if (statuses[0].IsNotFound()) {
        thread->stats.AddGets(1, 0);
      }
    }
  }

  Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
                        const std::vector<int>& rand_column_families,
                        const std::vector<int64_t>& rand_keys) override {
    assert(!rand_column_families.empty());
    assert(!rand_keys.empty());

    const std::string key = Key(rand_keys[0]);

    const size_t prefix_to_use =
        (FLAGS_prefix_size < 0) ? 7 : static_cast<size_t>(FLAGS_prefix_size);

    const Slice prefix(key.data(), prefix_to_use);

    std::string upper_bound;
    Slice ub_slice;

    ReadOptions ro_copy = readoptions;

    // Get the next prefix first and then see if we want to set upper bound.
    // We'll use the next prefix in an assertion later on
    if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
      ub_slice = Slice(upper_bound);
      ro_copy.iterate_upper_bound = &ub_slice;
    }

    ColumnFamilyHandle* const cfh =
        column_families_[rand_column_families[thread->rand.Uniform(
            static_cast<int>(rand_column_families.size()))]];
    assert(cfh);

    std::unique_ptr<Iterator> iter(db_->NewIterator(ro_copy, cfh));

    uint64_t count = 0;
    Status s;

    for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
         iter->Next()) {
      ++count;

      if (!VerifyWideColumns(iter->value(), iter->columns())) {
        s = Status::Corruption("Value and columns inconsistent",
                               DebugString(iter->value(), iter->columns()));
        break;
      }
    }

    assert(prefix_to_use == 0 ||
           count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));

    if (s.ok()) {
      s = iter->status();
    }

    if (!s.ok()) {
      fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
      thread->stats.AddErrors(1);

      return s;
    }

    thread->stats.AddPrefixes(1, count);

    return Status::OK();
  }

  ColumnFamilyHandle* GetControlCfh(ThreadState* thread,
                                    int /*column_family_id*/
                                    ) override {
    // All column families should contain the same data. Randomly pick one.
    return column_families_[thread->rand.Next() % column_families_.size()];
  }

  void VerifyDb(ThreadState* thread) const override {
    // This `ReadOptions` is for validation purposes. Ignore
    // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
    ReadOptions options(FLAGS_verify_checksum, true);

    // We must set total_order_seek to true because we are doing a SeekToFirst
    // on a column family whose memtables may support (by default) prefix-based
    // iterator. In this case, NewIterator with options.total_order_seek being
    // false returns a prefix-based iterator. Calling SeekToFirst using this
    // iterator causes the iterator to become invalid. That means we cannot
    // iterate the memtable using this iterator any more, although the memtable
    // contains the most up-to-date key-values.
    options.total_order_seek = true;

    ManagedSnapshot snapshot_guard(db_);
    options.snapshot = snapshot_guard.snapshot();

    const size_t num = column_families_.size();

    std::vector<std::unique_ptr<Iterator>> iters;
    iters.reserve(num);

    for (size_t i = 0; i < num; ++i) {
      iters.emplace_back(db_->NewIterator(options, column_families_[i]));
      iters.back()->SeekToFirst();
    }

    std::vector<Status> statuses(num, Status::OK());

    assert(thread);

    auto shared = thread->shared;
    assert(shared);

    do {
      if (shared->HasVerificationFailedYet()) {
        break;
      }

      size_t valid_cnt = 0;

      for (size_t i = 0; i < num; ++i) {
        const auto& iter = iters[i];
        assert(iter);

        if (iter->Valid()) {
          if (!VerifyWideColumns(iter->value(), iter->columns())) {
            statuses[i] =
                Status::Corruption("Value and columns inconsistent",
                                   DebugString(iter->value(), iter->columns()));
          } else {
            ++valid_cnt;
          }
        } else {
          statuses[i] = iter->status();
        }
      }

      if (valid_cnt == 0) {
        for (size_t i = 0; i < num; ++i) {
          const auto& s = statuses[i];
          if (!s.ok()) {
            fprintf(stderr, "Iterator on cf %s has error: %s\n",
                    column_families_[i]->GetName().c_str(),
                    s.ToString().c_str());
            shared->SetVerificationFailure();
          }
        }

        break;
      }

      if (valid_cnt < num) {
        shared->SetVerificationFailure();

        for (size_t i = 0; i < num; ++i) {
          assert(iters[i]);

          if (!iters[i]->Valid()) {
            if (statuses[i].ok()) {
              fprintf(stderr, "Finished scanning cf %s\n",
                      column_families_[i]->GetName().c_str());
            } else {
              fprintf(stderr, "Iterator on cf %s has error: %s\n",
                      column_families_[i]->GetName().c_str(),
                      statuses[i].ToString().c_str());
            }
          } else {
            fprintf(stderr, "cf %s has remaining data to scan\n",
                    column_families_[i]->GetName().c_str());
          }
        }

        break;
      }

      if (shared->HasVerificationFailedYet()) {
        break;
      }

      // If the program reaches here, then all column families' iterators are
      // still valid.
      assert(valid_cnt == num);

      if (shared->PrintingVerificationResults()) {
        continue;
      }

      assert(iters[0]);

      const Slice key = iters[0]->key();
      const Slice value = iters[0]->value();

      int num_mismatched_cfs = 0;

      for (size_t i = 1; i < num; ++i) {
        assert(iters[i]);

        const int cmp = key.compare(iters[i]->key());

        if (cmp != 0) {
          ++num_mismatched_cfs;

          if (1 == num_mismatched_cfs) {
            fprintf(stderr, "Verification failed\n");
            fprintf(stderr, "Latest Sequence Number: %" PRIu64 "\n",
                    db_->GetLatestSequenceNumber());
            fprintf(stderr, "[%s] %s => %s\n",
                    column_families_[0]->GetName().c_str(),
                    key.ToString(true /* hex */).c_str(),
                    value.ToString(true /* hex */).c_str());
          }

          fprintf(stderr, "[%s] %s => %s\n",
                  column_families_[i]->GetName().c_str(),
                  iters[i]->key().ToString(true /* hex */).c_str(),
                  iters[i]->value().ToString(true /* hex */).c_str());

          Slice begin_key;
          Slice end_key;
          if (cmp < 0) {
            begin_key = key;
            end_key = iters[i]->key();
          } else {
            begin_key = iters[i]->key();
            end_key = key;
          }

          const auto print_key_versions = [&](ColumnFamilyHandle* cfh) {
            constexpr size_t kMaxNumIKeys = 8;

            std::vector<KeyVersion> versions;
            const Status s = GetAllKeyVersions(db_, cfh, begin_key, end_key,
                                               kMaxNumIKeys, &versions);
            if (!s.ok()) {
              fprintf(stderr, "%s\n", s.ToString().c_str());
              return;
            }

            assert(cfh);

            fprintf(stderr,
                    "Internal keys in CF '%s', [%s, %s] (max %" ROCKSDB_PRIszt
                    ")\n",
                    cfh->GetName().c_str(),
                    begin_key.ToString(true /* hex */).c_str(),
                    end_key.ToString(true /* hex */).c_str(), kMaxNumIKeys);

            for (const KeyVersion& kv : versions) {
              fprintf(stderr, "  key %s seq %" PRIu64 " type %d\n",
                      Slice(kv.user_key).ToString(true).c_str(), kv.sequence,
                      kv.type);
            }
          };

          if (1 == num_mismatched_cfs) {
            print_key_versions(column_families_[0]);
          }

          print_key_versions(column_families_[i]);

          shared->SetVerificationFailure();
        }
      }

      shared->FinishPrintingVerificationResults();

      for (auto& iter : iters) {
        assert(iter);
        iter->Next();
      }
    } while (true);
  }

  void ContinuouslyVerifyDb(ThreadState* thread) const override {
    assert(thread);
    Status status;

    DB* db_ptr = cmp_db_ ? cmp_db_ : db_;
    const auto& cfhs = cmp_db_ ? cmp_cfhs_ : column_families_;

    // Take a snapshot to preserve the state of primary db.
    ManagedSnapshot snapshot_guard(db_);

    SharedState* shared = thread->shared;
    assert(shared);

    if (cmp_db_) {
      status = cmp_db_->TryCatchUpWithPrimary();
      if (!status.ok()) {
        fprintf(stderr, "TryCatchUpWithPrimary: %s\n",
                status.ToString().c_str());
        shared->SetShouldStopTest();
        assert(false);
        return;
      }
    }

    const auto checksum_column_family = [](Iterator* iter,
                                           uint32_t* checksum) -> Status {
      assert(nullptr != checksum);

      uint32_t ret = 0;
      for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
        ret = crc32c::Extend(ret, iter->key().data(), iter->key().size());
        ret = crc32c::Extend(ret, iter->value().data(), iter->value().size());

        for (const auto& column : iter->columns()) {
          ret = crc32c::Extend(ret, column.name().data(), column.name().size());
          ret =
              crc32c::Extend(ret, column.value().data(), column.value().size());
        }
      }

      *checksum = ret;
      return iter->status();
    };
    // This `ReadOptions` is for validation purposes. Ignore
    // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
    ReadOptions ropts(FLAGS_verify_checksum, true);
    ropts.total_order_seek = true;
    if (nullptr == cmp_db_) {
      ropts.snapshot = snapshot_guard.snapshot();
    }
    uint32_t crc = 0;
    {
      // Compute crc for all key-values of default column family.
      std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts));
      status = checksum_column_family(it.get(), &crc);
      if (!status.ok()) {
        fprintf(stderr, "Computing checksum of default cf: %s\n",
                status.ToString().c_str());
        assert(false);
      }
    }
    // Since we currently intentionally disallow reading from the secondary
    // instance with snapshot, we cannot achieve cross-cf consistency if WAL is
    // enabled because there is no guarantee that secondary instance replays
    // the primary's WAL to a consistent point where all cfs have the same
    // data.
    if (status.ok() && FLAGS_disable_wal) {
      uint32_t tmp_crc = 0;
      for (ColumnFamilyHandle* cfh : cfhs) {
        if (cfh == db_ptr->DefaultColumnFamily()) {
          continue;
        }
        std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts, cfh));
        status = checksum_column_family(it.get(), &tmp_crc);
        if (!status.ok() || tmp_crc != crc) {
          break;
        }
      }
      if (!status.ok()) {
        fprintf(stderr, "status: %s\n", status.ToString().c_str());
        shared->SetShouldStopTest();
        assert(false);
      } else if (tmp_crc != crc) {
        fprintf(stderr, "tmp_crc=%" PRIu32 " crc=%" PRIu32 "\n", tmp_crc, crc);
        shared->SetShouldStopTest();
        assert(false);
      }
    }
  }

  std::vector<int> GenerateColumnFamilies(
      const int /* num_column_families */,
      int /* rand_column_family */) const override {
    std::vector<int> ret;
    int num = static_cast<int>(column_families_.size());
    int k = 0;
    std::generate_n(back_inserter(ret), num, [&k]() -> int { return k++; });
    return ret;
  }

 private:
  std::atomic<uint32_t> batch_id_;
};

StressTest* CreateCfConsistencyStressTest() {
  return new CfConsistencyStressTest();
}

}  // namespace ROCKSDB_NAMESPACE
#endif  // GFLAGS