//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in
//  the LICENSE file in the root directory of this source tree. An
//  additional grant of patent rights can be found in the PATENTS file
//  in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include <algorithm>
#include <vector>
#include <string>
#include <thread>

#include "db/db_impl.h"
#include "db/db_test_util.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "util/coding.h"
#include "util/fault_injection_test_env.h"
#include "util/options_parser.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "utilities/merge_operators.h"

namespace rocksdb {

static const int kValueSize = 1000;

namespace {
std::string RandomString(Random* rnd, int len) {
  std::string r;
  test::RandomString(rnd, len, &r);
  return r;
}
}  // anonymous namespace

// counts how many operations were performed
class EnvCounter : public EnvWrapper {
 public:
  explicit EnvCounter(Env* base)
      : EnvWrapper(base), num_new_writable_file_(0) {}
  int GetNumberOfNewWritableFileCalls() {
    return num_new_writable_file_;
  }
  Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
                         const EnvOptions& soptions) override {
    ++num_new_writable_file_;
    return EnvWrapper::NewWritableFile(f, r, soptions);
  }

 private:
  int num_new_writable_file_;
};

class ColumnFamilyTest : public testing::Test {
 public:
  ColumnFamilyTest() : rnd_(139) {
    env_ = new EnvCounter(Env::Default());
    dbname_ = test::TmpDir() + "/column_family_test";
    db_options_.create_if_missing = true;
    db_options_.fail_if_options_file_error = true;
    db_options_.env = env_;
    DestroyDB(dbname_, Options(db_options_, column_family_options_));
  }

  ~ColumnFamilyTest() {
    Close();
    rocksdb::SyncPoint::GetInstance()->DisableProcessing();
    Destroy();
    delete env_;
  }

  // Return the value to associate with the specified key
  Slice Value(int k, std::string* storage) {
    if (k == 0) {
      // Ugh.  Random seed of 0 used to produce no entropy.  This code
      // preserves the implementation that was in place when all of the
      // magic values in this file were picked.
      *storage = std::string(kValueSize, ' ');
      return Slice(*storage);
    } else {
      Random r(k);
      return test::RandomString(&r, kValueSize, storage);
    }
  }

  void Build(int base, int n, int flush_every = 0) {
    std::string key_space, value_space;
    WriteBatch batch;

    for (int i = 0; i < n; i++) {
      if (flush_every != 0 && i != 0 && i % flush_every == 0) {
        DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
        dbi->TEST_FlushMemTable();
      }

      int keyi = base + i;
      Slice key(DBTestBase::Key(keyi));

      batch.Clear();
      batch.Put(handles_[0], key, Value(keyi, &value_space));
      batch.Put(handles_[1], key, Value(keyi, &value_space));
      batch.Put(handles_[2], key, Value(keyi, &value_space));
      ASSERT_OK(db_->Write(WriteOptions(), &batch));
    }
  }

  void CheckMissed() {
    uint64_t next_expected = 0;
    uint64_t missed = 0;
    int bad_keys = 0;
    int bad_values = 0;
    int correct = 0;
    std::string value_space;
    for (int cf = 0; cf < 3; cf++) {
      next_expected = 0;
      Iterator* iter = db_->NewIterator(ReadOptions(false, true), handles_[cf]);
      for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
        uint64_t key;
        Slice in(iter->key());
        in.remove_prefix(3);
        if (!ConsumeDecimalNumber(&in, &key) || !in.empty() ||
            key < next_expected) {
          bad_keys++;
          continue;
        }
        missed += (key - next_expected);
        next_expected = key + 1;
        if (iter->value() != Value(static_cast<int>(key), &value_space)) {
          bad_values++;
        } else {
          correct++;
        }
      }
      delete iter;
    }

    ASSERT_EQ(0, bad_keys);
    ASSERT_EQ(0, bad_values);
    ASSERT_EQ(0, missed);
    (void)correct;
  }

  void Close() {
    for (auto h : handles_) {
      if (h) {
        db_->DestroyColumnFamilyHandle(h);
      }
    }
    handles_.clear();
    names_.clear();
    delete db_;
    db_ = nullptr;
  }

  Status TryOpen(std::vector<std::string> cf,
                 std::vector<ColumnFamilyOptions> options = {}) {
    std::vector<ColumnFamilyDescriptor> column_families;
    names_.clear();
    for (size_t i = 0; i < cf.size(); ++i) {
      column_families.push_back(ColumnFamilyDescriptor(
          cf[i], options.size() == 0 ? column_family_options_ : options[i]));
      names_.push_back(cf[i]);
    }
    return DB::Open(db_options_, dbname_, column_families, &handles_, &db_);
  }

  Status OpenReadOnly(std::vector<std::string> cf,
                         std::vector<ColumnFamilyOptions> options = {}) {
    std::vector<ColumnFamilyDescriptor> column_families;
    names_.clear();
    for (size_t i = 0; i < cf.size(); ++i) {
      column_families.push_back(ColumnFamilyDescriptor(
          cf[i], options.size() == 0 ? column_family_options_ : options[i]));
      names_.push_back(cf[i]);
    }
    return DB::OpenForReadOnly(db_options_, dbname_, column_families, &handles_,
                               &db_);
  }

#ifndef ROCKSDB_LITE  // ReadOnlyDB is not supported
  void AssertOpenReadOnly(std::vector<std::string> cf,
                    std::vector<ColumnFamilyOptions> options = {}) {
    ASSERT_OK(OpenReadOnly(cf, options));
  }
#endif  // !ROCKSDB_LITE


  void Open(std::vector<std::string> cf,
            std::vector<ColumnFamilyOptions> options = {}) {
    ASSERT_OK(TryOpen(cf, options));
  }

  void Open() {
    Open({"default"});
  }

  DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_); }

  int GetProperty(int cf, std::string property) {
    std::string value;
    EXPECT_TRUE(dbfull()->GetProperty(handles_[cf], property, &value));
#ifndef CYGWIN
    return std::stoi(value);
#else
    return std::strtol(value.c_str(), 0 /* off */, 10 /* base */);
#endif
  }

  void Destroy() {
    Close();
    ASSERT_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_)));
  }

  void CreateColumnFamilies(
      const std::vector<std::string>& cfs,
      const std::vector<ColumnFamilyOptions> options = {}) {
    int cfi = static_cast<int>(handles_.size());
    handles_.resize(cfi + cfs.size());
    names_.resize(cfi + cfs.size());
    for (size_t i = 0; i < cfs.size(); ++i) {
      const auto& current_cf_opt =
          options.size() == 0 ? column_family_options_ : options[i];
      ASSERT_OK(
          db_->CreateColumnFamily(current_cf_opt, cfs[i], &handles_[cfi]));
      names_[cfi] = cfs[i];

#ifndef ROCKSDB_LITE  // RocksDBLite does not support GetDescriptor
      // Verify the CF options of the returned CF handle.
      ColumnFamilyDescriptor desc;
      ASSERT_OK(handles_[cfi]->GetDescriptor(&desc));
      RocksDBOptionsParser::VerifyCFOptions(desc.options, current_cf_opt);
#endif  // !ROCKSDB_LITE
      cfi++;
    }
  }

  void Reopen(const std::vector<ColumnFamilyOptions> options = {}) {
    std::vector<std::string> names;
    for (auto name : names_) {
      if (name != "") {
        names.push_back(name);
      }
    }
    Close();
    assert(options.size() == 0 || names.size() == options.size());
    Open(names, options);
  }

  void CreateColumnFamiliesAndReopen(const std::vector<std::string>& cfs) {
    CreateColumnFamilies(cfs);
    Reopen();
  }

  void DropColumnFamilies(const std::vector<int>& cfs) {
    for (auto cf : cfs) {
      ASSERT_OK(db_->DropColumnFamily(handles_[cf]));
      db_->DestroyColumnFamilyHandle(handles_[cf]);
      handles_[cf] = nullptr;
      names_[cf] = "";
    }
  }

  void PutRandomData(int cf, int num, int key_value_size, bool save = false) {
    for (int i = 0; i < num; ++i) {
      // 10 bytes for key, rest is value
      if (!save) {
        ASSERT_OK(Put(cf, test::RandomKey(&rnd_, 11),
                      RandomString(&rnd_, key_value_size - 10)));
      } else {
        std::string key = test::RandomKey(&rnd_, 11);
        keys_.insert(key);
        ASSERT_OK(Put(cf, key, RandomString(&rnd_, key_value_size - 10)));
      }
    }
  }

#ifndef ROCKSDB_LITE  // TEST functions in DB are not supported in lite
  void WaitForFlush(int cf) {
    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf]));
  }

  void WaitForCompaction() {
    ASSERT_OK(dbfull()->TEST_WaitForCompact());
  }

  uint64_t MaxTotalInMemoryState() {
    return dbfull()->TEST_MaxTotalInMemoryState();
  }

  void AssertMaxTotalInMemoryState(uint64_t value) {
    ASSERT_EQ(value, MaxTotalInMemoryState());
  }
#endif  // !ROCKSDB_LITE

  Status Put(int cf, const std::string& key, const std::string& value) {
    return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(value));
  }
  Status Merge(int cf, const std::string& key, const std::string& value) {
    return db_->Merge(WriteOptions(), handles_[cf], Slice(key), Slice(value));
  }
  Status Flush(int cf) {
    return db_->Flush(FlushOptions(), handles_[cf]);
  }

  std::string Get(int cf, const std::string& key) {
    ReadOptions options;
    options.verify_checksums = true;
    std::string result;
    Status s = db_->Get(options, handles_[cf], Slice(key), &result);
    if (s.IsNotFound()) {
      result = "NOT_FOUND";
    } else if (!s.ok()) {
      result = s.ToString();
    }
    return result;
  }

  void CompactAll(int cf) {
    ASSERT_OK(db_->CompactRange(CompactRangeOptions(), handles_[cf], nullptr,
                                nullptr));
  }

  void Compact(int cf, const Slice& start, const Slice& limit) {
    ASSERT_OK(
        db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
  }

  int NumTableFilesAtLevel(int level, int cf) {
    return GetProperty(cf,
                       "rocksdb.num-files-at-level" + ToString(level));
  }

#ifndef ROCKSDB_LITE
  // Return spread of files per level
  std::string FilesPerLevel(int cf) {
    std::string result;
    int last_non_zero_offset = 0;
    for (int level = 0; level < dbfull()->NumberLevels(handles_[cf]); level++) {
      int f = NumTableFilesAtLevel(level, cf);
      char buf[100];
      snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
      result += buf;
      if (f > 0) {
        last_non_zero_offset = static_cast<int>(result.size());
      }
    }
    result.resize(last_non_zero_offset);
    return result;
  }
#endif

  void AssertFilesPerLevel(const std::string& value, int cf) {
#ifndef ROCKSDB_LITE
    ASSERT_EQ(value, FilesPerLevel(cf));
#endif
  }

#ifndef ROCKSDB_LITE  // GetLiveFilesMetaData is not supported
  int CountLiveFiles() {
    std::vector<LiveFileMetaData> metadata;
    db_->GetLiveFilesMetaData(&metadata);
    return static_cast<int>(metadata.size());
  }
#endif  // !ROCKSDB_LITE

  void AssertCountLiveFiles(int expected_value) {
#ifndef ROCKSDB_LITE
    ASSERT_EQ(expected_value, CountLiveFiles());
#endif
  }

  // Do n memtable flushes, each of which produces an sstable
  // covering the range [small,large].
  void MakeTables(int cf, int n, const std::string& small,
                  const std::string& large) {
    for (int i = 0; i < n; i++) {
      ASSERT_OK(Put(cf, small, "begin"));
      ASSERT_OK(Put(cf, large, "end"));
      ASSERT_OK(db_->Flush(FlushOptions(), handles_[cf]));
    }
  }

#ifndef ROCKSDB_LITE  // GetSortedWalFiles is not supported
  int CountLiveLogFiles() {
    int micros_wait_for_log_deletion = 20000;
    env_->SleepForMicroseconds(micros_wait_for_log_deletion);
    int ret = 0;
    VectorLogPtr wal_files;
    Status s;
    // GetSortedWalFiles is a flakey function -- it gets all the wal_dir
    // children files and then later checks for their existence. if some of the
    // log files doesn't exist anymore, it reports an error. it does all of this
    // without DB mutex held, so if a background process deletes the log file
    // while the function is being executed, it returns an error. We retry the
    // function 10 times to avoid the error failing the test
    for (int retries = 0; retries < 10; ++retries) {
      wal_files.clear();
      s = db_->GetSortedWalFiles(wal_files);
      if (s.ok()) {
        break;
      }
    }
    EXPECT_OK(s);
    for (const auto& wal : wal_files) {
      if (wal->Type() == kAliveLogFile) {
        ++ret;
      }
    }
    return ret;
    return 0;
  }
#endif  // !ROCKSDB_LITE

  void AssertCountLiveLogFiles(int value) {
#ifndef ROCKSDB_LITE  // GetSortedWalFiles is not supported
    ASSERT_EQ(value, CountLiveLogFiles());
#endif  // !ROCKSDB_LITE
  }

  void AssertNumberOfImmutableMemtables(std::vector<int> num_per_cf) {
    assert(num_per_cf.size() == handles_.size());

#ifndef ROCKSDB_LITE  // GetProperty is not supported in lite
    for (size_t i = 0; i < num_per_cf.size(); ++i) {
      ASSERT_EQ(num_per_cf[i], GetProperty(static_cast<int>(i),
                                           "rocksdb.num-immutable-mem-table"));
    }
#endif  // !ROCKSDB_LITE
  }

  void CopyFile(const std::string& source, const std::string& destination,
                uint64_t size = 0) {
    const EnvOptions soptions;
    unique_ptr<SequentialFile> srcfile;
    ASSERT_OK(env_->NewSequentialFile(source, &srcfile, soptions));
    unique_ptr<WritableFile> destfile;
    ASSERT_OK(env_->NewWritableFile(destination, &destfile, soptions));

    if (size == 0) {
      // default argument means copy everything
      ASSERT_OK(env_->GetFileSize(source, &size));
    }

    char buffer[4096];
    Slice slice;
    while (size > 0) {
      uint64_t one = std::min(uint64_t(sizeof(buffer)), size);
      ASSERT_OK(srcfile->Read(one, &slice, buffer));
      ASSERT_OK(destfile->Append(slice));
      size -= slice.size();
    }
    ASSERT_OK(destfile->Close());
  }

  std::vector<ColumnFamilyHandle*> handles_;
  std::vector<std::string> names_;
  std::set<std::string> keys_;
  ColumnFamilyOptions column_family_options_;
  DBOptions db_options_;
  std::string dbname_;
  DB* db_ = nullptr;
  EnvCounter* env_;
  Random rnd_;
};

TEST_F(ColumnFamilyTest, DontReuseColumnFamilyID) {
  for (int iter = 0; iter < 3; ++iter) {
    Open();
    CreateColumnFamilies({"one", "two", "three"});
    for (size_t i = 0; i < handles_.size(); ++i) {
      auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(handles_[i]);
      ASSERT_EQ(i, cfh->GetID());
    }
    if (iter == 1) {
      Reopen();
    }
    DropColumnFamilies({3});
    Reopen();
    if (iter == 2) {
      // this tests if max_column_family is correctly persisted with
      // WriteSnapshot()
      Reopen();
    }
    CreateColumnFamilies({"three2"});
    // ID 3 that was used for dropped column family "three" should not be reused
    auto cfh3 = reinterpret_cast<ColumnFamilyHandleImpl*>(handles_[3]);
    ASSERT_EQ(4U, cfh3->GetID());
    Close();
    Destroy();
  }
}

class FlushEmptyCFTestWithParam : public ColumnFamilyTest,
                                  public testing::WithParamInterface<bool> {
 public:
  FlushEmptyCFTestWithParam() { allow_2pc_ = GetParam(); }

  // Required if inheriting from testing::WithParamInterface<>
  static void SetUpTestCase() {}
  static void TearDownTestCase() {}

  bool allow_2pc_;
};

TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest) {
  std::unique_ptr<FaultInjectionTestEnv> fault_env(
      new FaultInjectionTestEnv(env_));
  db_options_.env = fault_env.get();
  db_options_.allow_2pc = allow_2pc_;
  Open();
  CreateColumnFamilies({"one", "two"});
  // Generate log file A.
  ASSERT_OK(Put(1, "foo", "v1"));  // seqID 1

  Reopen();
  // Log file A is not dropped after reopening because default column family's
  // min log number is 0.
  // It flushes to SST file X
  ASSERT_OK(Put(1, "foo", "v1"));  // seqID 2
  ASSERT_OK(Put(1, "bar", "v2"));  // seqID 3
  // Current log file is file B now. While flushing, a new log file C is created
  // and is set to current. Boths' min log number is set to file C in memory, so
  // after flushing file B is deleted. At the same time, the min log number of
  // default CF is not written to manifest. Log file A still remains.
  // Flushed to SST file Y.
  Flush(1);
  Flush(0);
  ASSERT_OK(Put(1, "bar", "v3"));  // seqID 4
  ASSERT_OK(Put(1, "foo", "v4"));  // seqID 5

  // Preserve file system state up to here to simulate a crash condition.
  fault_env->SetFilesystemActive(false);
  std::vector<std::string> names;
  for (auto name : names_) {
    if (name != "") {
      names.push_back(name);
    }
  }

  Close();
  fault_env->ResetState();

  // Before opening, there are four files:
  //   Log file A contains seqID 1
  //   Log file C contains seqID 4, 5
  //   SST file X contains seqID 1
  //   SST file Y contains seqID 2, 3
  // Min log number:
  //   default CF: 0
  //   CF one, two: C
  // When opening the DB, all the seqID should be preserved.
  Open(names, {});
  ASSERT_EQ("v4", Get(1, "foo"));
  ASSERT_EQ("v3", Get(1, "bar"));
  Close();

  db_options_.env = env_;
}

TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest2) {
  std::unique_ptr<FaultInjectionTestEnv> fault_env(
      new FaultInjectionTestEnv(env_));
  db_options_.env = fault_env.get();
  db_options_.allow_2pc = allow_2pc_;
  Open();
  CreateColumnFamilies({"one", "two"});
  // Generate log file A.
  ASSERT_OK(Put(1, "foo", "v1"));  // seqID 1

  Reopen();
  // Log file A is not dropped after reopening because default column family's
  // min log number is 0.
  // It flushes to SST file X
  ASSERT_OK(Put(1, "foo", "v1"));  // seqID 2
  ASSERT_OK(Put(1, "bar", "v2"));  // seqID 3
  // Current log file is file B now. While flushing, a new log file C is created
  // and is set to current. Both CFs' min log number is set to file C so after
  // flushing file B is deleted. Log file A still remains.
  // Flushed to SST file Y.
  Flush(1);
  ASSERT_OK(Put(0, "bar", "v2"));  // seqID 4
  ASSERT_OK(Put(2, "bar", "v2"));  // seqID 5
  ASSERT_OK(Put(1, "bar", "v3"));  // seqID 6
  // Flushing all column families. This forces all CFs' min log to current. This
  // is written to the manifest file. Log file C is cleared.
  Flush(0);
  Flush(1);
  Flush(2);
  // Write to log file D
  ASSERT_OK(Put(1, "bar", "v4"));  // seqID 7
  ASSERT_OK(Put(1, "bar", "v5"));  // seqID 8
  // Preserve file system state up to here to simulate a crash condition.
  fault_env->SetFilesystemActive(false);
  std::vector<std::string> names;
  for (auto name : names_) {
    if (name != "") {
      names.push_back(name);
    }
  }

  Close();
  fault_env->ResetState();
  // Before opening, there are two logfiles:
  //   Log file A contains seqID 1
  //   Log file D contains seqID 7, 8
  // Min log number:
  //   default CF: D
  //   CF one, two: D
  // When opening the DB, log file D should be replayed using the seqID
  // specified in the file.
  Open(names, {});
  ASSERT_EQ("v1", Get(1, "foo"));
  ASSERT_EQ("v5", Get(1, "bar"));
  Close();

  db_options_.env = env_;
}

INSTANTIATE_TEST_CASE_P(FlushEmptyCFTestWithParam, FlushEmptyCFTestWithParam,
                        ::testing::Bool());

TEST_F(ColumnFamilyTest, AddDrop) {
  Open();
  CreateColumnFamilies({"one", "two", "three"});
  ASSERT_EQ("NOT_FOUND", Get(1, "fodor"));
  ASSERT_EQ("NOT_FOUND", Get(2, "fodor"));
  DropColumnFamilies({2});
  ASSERT_EQ("NOT_FOUND", Get(1, "fodor"));
  CreateColumnFamilies({"four"});
  ASSERT_EQ("NOT_FOUND", Get(3, "fodor"));
  ASSERT_OK(Put(1, "fodor", "mirko"));
  ASSERT_EQ("mirko", Get(1, "fodor"));
  ASSERT_EQ("NOT_FOUND", Get(3, "fodor"));
  Close();
  ASSERT_TRUE(TryOpen({"default"}).IsInvalidArgument());
  Open({"default", "one", "three", "four"});
  DropColumnFamilies({1});
  Reopen();
  Close();

  std::vector<std::string> families;
  ASSERT_OK(DB::ListColumnFamilies(db_options_, dbname_, &families));
  std::sort(families.begin(), families.end());
  ASSERT_TRUE(families ==
              std::vector<std::string>({"default", "four", "three"}));
}

TEST_F(ColumnFamilyTest, DropTest) {
  // first iteration - dont reopen DB before dropping
  // second iteration - reopen DB before dropping
  for (int iter = 0; iter < 2; ++iter) {
    Open({"default"});
    CreateColumnFamiliesAndReopen({"pikachu"});
    for (int i = 0; i < 100; ++i) {
      ASSERT_OK(Put(1, ToString(i), "bar" + ToString(i)));
    }
    ASSERT_OK(Flush(1));

    if (iter == 1) {
      Reopen();
    }
    ASSERT_EQ("bar1", Get(1, "1"));

    AssertCountLiveFiles(1);
    DropColumnFamilies({1});
    // make sure that all files are deleted when we drop the column family
    AssertCountLiveFiles(0);
    Destroy();
  }
}

TEST_F(ColumnFamilyTest, WriteBatchFailure) {
  Open();
  CreateColumnFamiliesAndReopen({"one", "two"});
  WriteBatch batch;
  batch.Put(handles_[0], Slice("existing"), Slice("column-family"));
  batch.Put(handles_[1], Slice("non-existing"), Slice("column-family"));
  ASSERT_OK(db_->Write(WriteOptions(), &batch));
  DropColumnFamilies({1});
  WriteOptions woptions_ignore_missing_cf;
  woptions_ignore_missing_cf.ignore_missing_column_families = true;
  batch.Put(handles_[0], Slice("still here"), Slice("column-family"));
  ASSERT_OK(db_->Write(woptions_ignore_missing_cf, &batch));
  ASSERT_EQ("column-family", Get(0, "still here"));
  Status s = db_->Write(WriteOptions(), &batch);
  ASSERT_TRUE(s.IsInvalidArgument());
  Close();
}

TEST_F(ColumnFamilyTest, ReadWrite) {
  Open();
  CreateColumnFamiliesAndReopen({"one", "two"});
  ASSERT_OK(Put(0, "foo", "v1"));
  ASSERT_OK(Put(0, "bar", "v2"));
  ASSERT_OK(Put(1, "mirko", "v3"));
  ASSERT_OK(Put(0, "foo", "v2"));
  ASSERT_OK(Put(2, "fodor", "v5"));

  for (int iter = 0; iter <= 3; ++iter) {
    ASSERT_EQ("v2", Get(0, "foo"));
    ASSERT_EQ("v2", Get(0, "bar"));
    ASSERT_EQ("v3", Get(1, "mirko"));
    ASSERT_EQ("v5", Get(2, "fodor"));
    ASSERT_EQ("NOT_FOUND", Get(0, "fodor"));
    ASSERT_EQ("NOT_FOUND", Get(1, "fodor"));
    ASSERT_EQ("NOT_FOUND", Get(2, "foo"));
    if (iter <= 1) {
      Reopen();
    }
  }
  Close();
}

TEST_F(ColumnFamilyTest, IgnoreRecoveredLog) {
  std::string backup_logs = dbname_ + "/backup_logs";

  // delete old files in backup_logs directory
  ASSERT_OK(env_->CreateDirIfMissing(dbname_));
  ASSERT_OK(env_->CreateDirIfMissing(backup_logs));
  std::vector<std::string> old_files;
  env_->GetChildren(backup_logs, &old_files);
  for (auto& file : old_files) {
    if (file != "." && file != "..") {
      env_->DeleteFile(backup_logs + "/" + file);
    }
  }

  column_family_options_.merge_operator =
      MergeOperators::CreateUInt64AddOperator();
  db_options_.wal_dir = dbname_ + "/logs";
  Destroy();
  Open();
  CreateColumnFamilies({"cf1", "cf2"});

  // fill up the DB
  std::string one, two, three;
  PutFixed64(&one, 1);
  PutFixed64(&two, 2);
  PutFixed64(&three, 3);
  ASSERT_OK(Merge(0, "foo", one));
  ASSERT_OK(Merge(1, "mirko", one));
  ASSERT_OK(Merge(0, "foo", one));
  ASSERT_OK(Merge(2, "bla", one));
  ASSERT_OK(Merge(2, "fodor", one));
  ASSERT_OK(Merge(0, "bar", one));
  ASSERT_OK(Merge(2, "bla", one));
  ASSERT_OK(Merge(1, "mirko", two));
  ASSERT_OK(Merge(1, "franjo", one));

  // copy the logs to backup
  std::vector<std::string> logs;
  env_->GetChildren(db_options_.wal_dir, &logs);
  for (auto& log : logs) {
    if (log != ".." && log != ".") {
      CopyFile(db_options_.wal_dir + "/" + log, backup_logs + "/" + log);
    }
  }

  // recover the DB
  Close();

  // 1. check consistency
  // 2. copy the logs from backup back to WAL dir. if the recovery happens
  // again on the same log files, this should lead to incorrect results
  // due to applying merge operator twice
  // 3. check consistency
  for (int iter = 0; iter < 2; ++iter) {
    // assert consistency
    Open({"default", "cf1", "cf2"});
    ASSERT_EQ(two, Get(0, "foo"));
    ASSERT_EQ(one, Get(0, "bar"));
    ASSERT_EQ(three, Get(1, "mirko"));
    ASSERT_EQ(one, Get(1, "franjo"));
    ASSERT_EQ(one, Get(2, "fodor"));
    ASSERT_EQ(two, Get(2, "bla"));
    Close();

    if (iter == 0) {
      // copy the logs from backup back to wal dir
      for (auto& log : logs) {
        if (log != ".." && log != ".") {
          CopyFile(backup_logs + "/" + log, db_options_.wal_dir + "/" + log);
        }
      }
    }
  }
}

#ifndef ROCKSDB_LITE  // TEST functions used are not supported
TEST_F(ColumnFamilyTest, FlushTest) {
  Open();
  CreateColumnFamiliesAndReopen({"one", "two"});
  ASSERT_OK(Put(0, "foo", "v1"));
  ASSERT_OK(Put(0, "bar", "v2"));
  ASSERT_OK(Put(1, "mirko", "v3"));
  ASSERT_OK(Put(0, "foo", "v2"));
  ASSERT_OK(Put(2, "fodor", "v5"));

  for (int j = 0; j < 2; j++) {
    ReadOptions ro;
    std::vector<Iterator*> iterators;
    // Hold super version.
    if (j == 0) {
      ASSERT_OK(db_->NewIterators(ro, handles_, &iterators));
    }

    for (int i = 0; i < 3; ++i) {
      uint64_t max_total_in_memory_state =
          MaxTotalInMemoryState();
      Flush(i);
      AssertMaxTotalInMemoryState(max_total_in_memory_state);
    }
    ASSERT_OK(Put(1, "foofoo", "bar"));
    ASSERT_OK(Put(0, "foofoo", "bar"));

    for (auto* it : iterators) {
      delete it;
    }
  }
  Reopen();

  for (int iter = 0; iter <= 2; ++iter) {
    ASSERT_EQ("v2", Get(0, "foo"));
    ASSERT_EQ("v2", Get(0, "bar"));
    ASSERT_EQ("v3", Get(1, "mirko"));
    ASSERT_EQ("v5", Get(2, "fodor"));
    ASSERT_EQ("NOT_FOUND", Get(0, "fodor"));
    ASSERT_EQ("NOT_FOUND", Get(1, "fodor"));
    ASSERT_EQ("NOT_FOUND", Get(2, "foo"));
    if (iter <= 1) {
      Reopen();
    }
  }
  Close();
}

// Makes sure that obsolete log files get deleted
TEST_F(ColumnFamilyTest, LogDeletionTest) {
  db_options_.max_total_wal_size = std::numeric_limits<uint64_t>::max();
  column_family_options_.arena_block_size = 4 * 1024;
  column_family_options_.write_buffer_size = 100000;  // 100KB
  Open();
  CreateColumnFamilies({"one", "two", "three", "four"});
  // Each bracket is one log file. if number is in (), it means
  // we don't need it anymore (it's been flushed)
  // []
  AssertCountLiveLogFiles(0);
  PutRandomData(0, 1, 100);
  // [0]
  PutRandomData(1, 1, 100);
  // [0, 1]
  PutRandomData(1, 1000, 100);
  WaitForFlush(1);
  // [0, (1)] [1]
  AssertCountLiveLogFiles(2);
  PutRandomData(0, 1, 100);
  // [0, (1)] [0, 1]
  AssertCountLiveLogFiles(2);
  PutRandomData(2, 1, 100);
  // [0, (1)] [0, 1, 2]
  PutRandomData(2, 1000, 100);
  WaitForFlush(2);
  // [0, (1)] [0, 1, (2)] [2]
  AssertCountLiveLogFiles(3);
  PutRandomData(2, 1000, 100);
  WaitForFlush(2);
  // [0, (1)] [0, 1, (2)] [(2)] [2]
  AssertCountLiveLogFiles(4);
  PutRandomData(3, 1, 100);
  // [0, (1)] [0, 1, (2)] [(2)] [2, 3]
  PutRandomData(1, 1, 100);
  // [0, (1)] [0, 1, (2)] [(2)] [1, 2, 3]
  AssertCountLiveLogFiles(4);
  PutRandomData(1, 1000, 100);
  WaitForFlush(1);
  // [0, (1)] [0, (1), (2)] [(2)] [(1), 2, 3] [1]
  AssertCountLiveLogFiles(5);
  PutRandomData(0, 1000, 100);
  WaitForFlush(0);
  // [(0), (1)] [(0), (1), (2)] [(2)] [(1), 2, 3] [1, (0)] [0]
  // delete obsolete logs -->
  // [(1), 2, 3] [1, (0)] [0]
  AssertCountLiveLogFiles(3);
  PutRandomData(0, 1000, 100);
  WaitForFlush(0);
  // [(1), 2, 3] [1, (0)], [(0)] [0]
  AssertCountLiveLogFiles(4);
  PutRandomData(1, 1000, 100);
  WaitForFlush(1);
  // [(1), 2, 3] [(1), (0)] [(0)] [0, (1)] [1]
  AssertCountLiveLogFiles(5);
  PutRandomData(2, 1000, 100);
  WaitForFlush(2);
  // [(1), (2), 3] [(1), (0)] [(0)] [0, (1)] [1, (2)], [2]
  AssertCountLiveLogFiles(6);
  PutRandomData(3, 1000, 100);
  WaitForFlush(3);
  // [(1), (2), (3)] [(1), (0)] [(0)] [0, (1)] [1, (2)], [2, (3)] [3]
  // delete obsolete logs -->
  // [0, (1)] [1, (2)], [2, (3)] [3]
  AssertCountLiveLogFiles(4);
  Close();
}
#endif  // !ROCKSDB_LITE

TEST_F(ColumnFamilyTest, CrashAfterFlush) {
  std::unique_ptr<FaultInjectionTestEnv> fault_env(
      new FaultInjectionTestEnv(env_));
  db_options_.env = fault_env.get();
  Open();
  CreateColumnFamilies({"one"});

  WriteBatch batch;
  batch.Put(handles_[0], Slice("foo"), Slice("bar"));
  batch.Put(handles_[1], Slice("foo"), Slice("bar"));
  ASSERT_OK(db_->Write(WriteOptions(), &batch));
  Flush(0);
  fault_env->SetFilesystemActive(false);

  std::vector<std::string> names;
  for (auto name : names_) {
    if (name != "") {
      names.push_back(name);
    }
  }
  Close();
  fault_env->DropUnsyncedFileData();
  fault_env->ResetState();
  Open(names, {});

  // Write batch should be atomic.
  ASSERT_EQ(Get(0, "foo"), Get(1, "foo"));

  Close();
  db_options_.env = env_;
}

TEST_F(ColumnFamilyTest, OpenNonexistentColumnFamily) {
  ASSERT_OK(TryOpen({"default"}));
  Close();
  ASSERT_TRUE(TryOpen({"default", "dne"}).IsInvalidArgument());
}

#ifndef ROCKSDB_LITE  // WaitForFlush() is not supported
// Makes sure that obsolete log files get deleted
TEST_F(ColumnFamilyTest, DifferentWriteBufferSizes) {
  // disable flushing stale column families
  db_options_.max_total_wal_size = std::numeric_limits<uint64_t>::max();
  Open();
  CreateColumnFamilies({"one", "two", "three"});
  ColumnFamilyOptions default_cf, one, two, three;
  // setup options. all column families have max_write_buffer_number setup to 10
  // "default" -> 100KB memtable, start flushing immediatelly
  // "one" -> 200KB memtable, start flushing with two immutable memtables
  // "two" -> 1MB memtable, start flushing with three immutable memtables
  // "three" -> 90KB memtable, start flushing with four immutable memtables
  default_cf.write_buffer_size = 100000;
  default_cf.arena_block_size = 4 * 4096;
  default_cf.max_write_buffer_number = 10;
  default_cf.min_write_buffer_number_to_merge = 1;
  default_cf.max_write_buffer_number_to_maintain = 0;
  one.write_buffer_size = 200000;
  one.arena_block_size = 4 * 4096;
  one.max_write_buffer_number = 10;
  one.min_write_buffer_number_to_merge = 2;
  one.max_write_buffer_number_to_maintain = 1;
  two.write_buffer_size = 1000000;
  two.arena_block_size = 4 * 4096;
  two.max_write_buffer_number = 10;
  two.min_write_buffer_number_to_merge = 3;
  two.max_write_buffer_number_to_maintain = 2;
  three.write_buffer_size = 4096 * 22;
  three.arena_block_size = 4096;
  three.max_write_buffer_number = 10;
  three.min_write_buffer_number_to_merge = 4;
  three.max_write_buffer_number_to_maintain = -1;

  Reopen({default_cf, one, two, three});

  int micros_wait_for_flush = 10000;
  PutRandomData(0, 100, 1000);
  WaitForFlush(0);
  AssertNumberOfImmutableMemtables({0, 0, 0, 0});
  AssertCountLiveLogFiles(1);
  PutRandomData(1, 200, 1000);
  env_->SleepForMicroseconds(micros_wait_for_flush);
  AssertNumberOfImmutableMemtables({0, 1, 0, 0});
  AssertCountLiveLogFiles(2);
  PutRandomData(2, 1000, 1000);
  env_->SleepForMicroseconds(micros_wait_for_flush);
  AssertNumberOfImmutableMemtables({0, 1, 1, 0});
  AssertCountLiveLogFiles(3);
  PutRandomData(2, 1000, 1000);
  env_->SleepForMicroseconds(micros_wait_for_flush);
  AssertNumberOfImmutableMemtables({0, 1, 2, 0});
  AssertCountLiveLogFiles(4);
  PutRandomData(3, 93, 990);
  env_->SleepForMicroseconds(micros_wait_for_flush);
  AssertNumberOfImmutableMemtables({0, 1, 2, 1});
  AssertCountLiveLogFiles(5);
  PutRandomData(3, 88, 990);
  env_->SleepForMicroseconds(micros_wait_for_flush);
  AssertNumberOfImmutableMemtables({0, 1, 2, 2});
  AssertCountLiveLogFiles(6);
  PutRandomData(3, 88, 990);
  env_->SleepForMicroseconds(micros_wait_for_flush);
  AssertNumberOfImmutableMemtables({0, 1, 2, 3});
  AssertCountLiveLogFiles(7);
  PutRandomData(0, 100, 1000);
  WaitForFlush(0);
  AssertNumberOfImmutableMemtables({0, 1, 2, 3});
  AssertCountLiveLogFiles(8);
  PutRandomData(2, 100, 10000);
  WaitForFlush(2);
  AssertNumberOfImmutableMemtables({0, 1, 0, 3});
  AssertCountLiveLogFiles(9);
  PutRandomData(3, 88, 990);
  WaitForFlush(3);
  AssertNumberOfImmutableMemtables({0, 1, 0, 0});
  AssertCountLiveLogFiles(10);
  PutRandomData(3, 88, 990);
  env_->SleepForMicroseconds(micros_wait_for_flush);
  AssertNumberOfImmutableMemtables({0, 1, 0, 1});
  AssertCountLiveLogFiles(11);
  PutRandomData(1, 200, 1000);
  WaitForFlush(1);
  AssertNumberOfImmutableMemtables({0, 0, 0, 1});
  AssertCountLiveLogFiles(5);
  PutRandomData(3, 88 * 3, 990);
  WaitForFlush(3);
  PutRandomData(3, 88 * 4, 990);
  WaitForFlush(3);
  AssertNumberOfImmutableMemtables({0, 0, 0, 0});
  AssertCountLiveLogFiles(12);
  PutRandomData(0, 100, 1000);
  WaitForFlush(0);
  AssertNumberOfImmutableMemtables({0, 0, 0, 0});
  AssertCountLiveLogFiles(12);
  PutRandomData(2, 3 * 1000, 1000);
  WaitForFlush(2);
  AssertNumberOfImmutableMemtables({0, 0, 0, 0});
  AssertCountLiveLogFiles(12);
  PutRandomData(1, 2*200, 1000);
  WaitForFlush(1);
  AssertNumberOfImmutableMemtables({0, 0, 0, 0});
  AssertCountLiveLogFiles(7);
  Close();
}
#endif  // !ROCKSDB_LITE

#ifndef ROCKSDB_LITE  // Cuckoo is not supported in lite
TEST_F(ColumnFamilyTest, MemtableNotSupportSnapshot) {
  db_options_.allow_concurrent_memtable_write = false;
  Open();
  auto* s1 = dbfull()->GetSnapshot();
  ASSERT_TRUE(s1 != nullptr);
  dbfull()->ReleaseSnapshot(s1);

  // Add a column family that doesn't support snapshot
  ColumnFamilyOptions first;
  first.memtable_factory.reset(NewHashCuckooRepFactory(1024 * 1024));
  CreateColumnFamilies({"first"}, {first});
  auto* s2 = dbfull()->GetSnapshot();
  ASSERT_TRUE(s2 == nullptr);

  // Add a column family that supports snapshot. Snapshot stays not supported.
  ColumnFamilyOptions second;
  CreateColumnFamilies({"second"}, {second});
  auto* s3 = dbfull()->GetSnapshot();
  ASSERT_TRUE(s3 == nullptr);
  Close();
}
#endif  // !ROCKSDB_LITE

class TestComparator : public Comparator {
  int Compare(const rocksdb::Slice& a, const rocksdb::Slice& b) const override {
    return 0;
  }
  const char* Name() const override { return "Test"; }
  void FindShortestSeparator(std::string* start,
                             const rocksdb::Slice& limit) const override {}
  void FindShortSuccessor(std::string* key) const override {}
};

static TestComparator third_comparator;
static TestComparator fourth_comparator;

// Test that we can retrieve the comparator from a created CF
TEST_F(ColumnFamilyTest, GetComparator) {
  Open();
  // Add a column family with no comparator specified
  CreateColumnFamilies({"first"});
  const Comparator* comp = handles_[0]->GetComparator();
  ASSERT_EQ(comp, BytewiseComparator());

  // Add three column families - one with no comparator and two
  // with comparators specified
  ColumnFamilyOptions second, third, fourth;
  second.comparator = &third_comparator;
  third.comparator = &fourth_comparator;
  CreateColumnFamilies({"second", "third", "fourth"}, {second, third, fourth});
  ASSERT_EQ(handles_[1]->GetComparator(), BytewiseComparator());
  ASSERT_EQ(handles_[2]->GetComparator(), &third_comparator);
  ASSERT_EQ(handles_[3]->GetComparator(), &fourth_comparator);
  Close();
}

TEST_F(ColumnFamilyTest, DifferentMergeOperators) {
  Open();
  CreateColumnFamilies({"first", "second"});
  ColumnFamilyOptions default_cf, first, second;
  first.merge_operator = MergeOperators::CreateUInt64AddOperator();
  second.merge_operator = MergeOperators::CreateStringAppendOperator();
  Reopen({default_cf, first, second});

  std::string one, two, three;
  PutFixed64(&one, 1);
  PutFixed64(&two, 2);
  PutFixed64(&three, 3);

  ASSERT_OK(Put(0, "foo", two));
  ASSERT_OK(Put(0, "foo", one));
  ASSERT_TRUE(Merge(0, "foo", two).IsNotSupported());
  ASSERT_EQ(Get(0, "foo"), one);

  ASSERT_OK(Put(1, "foo", two));
  ASSERT_OK(Put(1, "foo", one));
  ASSERT_OK(Merge(1, "foo", two));
  ASSERT_EQ(Get(1, "foo"), three);

  ASSERT_OK(Put(2, "foo", two));
  ASSERT_OK(Put(2, "foo", one));
  ASSERT_OK(Merge(2, "foo", two));
  ASSERT_EQ(Get(2, "foo"), one + "," + two);
  Close();
}

#ifndef ROCKSDB_LITE  // WaitForFlush() is not supported
TEST_F(ColumnFamilyTest, DifferentCompactionStyles) {
  Open();
  CreateColumnFamilies({"one", "two"});
  ColumnFamilyOptions default_cf, one, two;
  db_options_.max_open_files = 20;  // only 10 files in file cache

  default_cf.compaction_style = kCompactionStyleLevel;
  default_cf.num_levels = 3;
  default_cf.write_buffer_size = 64 << 10;  // 64KB
  default_cf.target_file_size_base = 30 << 10;
  default_cf.max_compaction_bytes = static_cast<uint64_t>(1) << 60;

  BlockBasedTableOptions table_options;
  table_options.no_block_cache = true;
  default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));

  one.compaction_style = kCompactionStyleUniversal;

  one.num_levels = 1;
  // trigger compaction if there are >= 4 files
  one.level0_file_num_compaction_trigger = 4;
  one.write_buffer_size = 120000;

  two.compaction_style = kCompactionStyleLevel;
  two.num_levels = 4;
  two.level0_file_num_compaction_trigger = 3;
  two.write_buffer_size = 100000;

  Reopen({default_cf, one, two});

  // SETUP column family "one" -- universal style
  for (int i = 0; i < one.level0_file_num_compaction_trigger - 1; ++i) {
    PutRandomData(1, 10, 12000);
    PutRandomData(1, 1, 10);
    WaitForFlush(1);
    AssertFilesPerLevel(ToString(i + 1), 1);
  }

  // SETUP column family "two" -- level style with 4 levels
  for (int i = 0; i < two.level0_file_num_compaction_trigger - 1; ++i) {
    PutRandomData(2, 10, 12000);
    PutRandomData(2, 1, 10);
    WaitForFlush(2);
    AssertFilesPerLevel(ToString(i + 1), 2);
  }

  // TRIGGER compaction "one"
  PutRandomData(1, 10, 12000);
  PutRandomData(1, 1, 10);

  // TRIGGER compaction "two"
  PutRandomData(2, 10, 12000);
  PutRandomData(2, 1, 10);

  // WAIT for compactions
  WaitForCompaction();

  // VERIFY compaction "one"
  AssertFilesPerLevel("1", 1);

  // VERIFY compaction "two"
  AssertFilesPerLevel("0,1", 2);
  CompactAll(2);
  AssertFilesPerLevel("0,1", 2);

  Close();
}
#endif  // !ROCKSDB_LITE

#ifndef ROCKSDB_LITE
// Sync points not supported in RocksDB Lite

TEST_F(ColumnFamilyTest, MultipleManualCompactions) {
  Open();
  CreateColumnFamilies({"one", "two"});
  ColumnFamilyOptions default_cf, one, two;
  db_options_.max_open_files = 20;  // only 10 files in file cache
  db_options_.max_background_compactions = 3;

  default_cf.compaction_style = kCompactionStyleLevel;
  default_cf.num_levels = 3;
  default_cf.write_buffer_size = 64 << 10;  // 64KB
  default_cf.target_file_size_base = 30 << 10;
  default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
  BlockBasedTableOptions table_options;
  table_options.no_block_cache = true;
  default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));

  one.compaction_style = kCompactionStyleUniversal;

  one.num_levels = 1;
  // trigger compaction if there are >= 4 files
  one.level0_file_num_compaction_trigger = 4;
  one.write_buffer_size = 120000;

  two.compaction_style = kCompactionStyleLevel;
  two.num_levels = 4;
  two.level0_file_num_compaction_trigger = 3;
  two.write_buffer_size = 100000;

  Reopen({default_cf, one, two});

  // SETUP column family "one" -- universal style
  for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
    PutRandomData(1, 10, 12000, true);
    PutRandomData(1, 1, 10, true);
    WaitForFlush(1);
    AssertFilesPerLevel(ToString(i + 1), 1);
  }
  bool cf_1_1 = true;
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
      {{"ColumnFamilyTest::MultiManual:4", "ColumnFamilyTest::MultiManual:1"},
       {"ColumnFamilyTest::MultiManual:2", "ColumnFamilyTest::MultiManual:5"},
       {"ColumnFamilyTest::MultiManual:2", "ColumnFamilyTest::MultiManual:3"}});
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
        if (cf_1_1) {
          TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:4");
          cf_1_1 = false;
          TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:3");
        }
      });

  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
  std::vector<port::Thread> threads;
  threads.emplace_back([&] {
    CompactRangeOptions compact_options;
    compact_options.exclusive_manual_compaction = false;
    ASSERT_OK(
        db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
  });

  // SETUP column family "two" -- level style with 4 levels
  for (int i = 0; i < two.level0_file_num_compaction_trigger - 2; ++i) {
    PutRandomData(2, 10, 12000);
    PutRandomData(2, 1, 10);
    WaitForFlush(2);
    AssertFilesPerLevel(ToString(i + 1), 2);
  }
  threads.emplace_back([&] {
    TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:1");
    CompactRangeOptions compact_options;
    compact_options.exclusive_manual_compaction = false;
    ASSERT_OK(
        db_->CompactRange(compact_options, handles_[2], nullptr, nullptr));
    TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:2");
  });

  TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:5");
  for (auto& t : threads) {
    t.join();
  }

  // VERIFY compaction "one"
  AssertFilesPerLevel("1", 1);

  // VERIFY compaction "two"
  AssertFilesPerLevel("0,1", 2);
  CompactAll(2);
  AssertFilesPerLevel("0,1", 2);
  // Compare against saved keys
  std::set<std::string>::iterator key_iter = keys_.begin();
  while (key_iter != keys_.end()) {
    ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
    key_iter++;
  }
  Close();
}

TEST_F(ColumnFamilyTest, AutomaticAndManualCompactions) {
  Open();
  CreateColumnFamilies({"one", "two"});
  ColumnFamilyOptions default_cf, one, two;
  db_options_.max_open_files = 20;  // only 10 files in file cache
  db_options_.max_background_compactions = 3;
  db_options_.base_background_compactions = 3;

  default_cf.compaction_style = kCompactionStyleLevel;
  default_cf.num_levels = 3;
  default_cf.write_buffer_size = 64 << 10;  // 64KB
  default_cf.target_file_size_base = 30 << 10;
  default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
  BlockBasedTableOptions table_options;
  table_options.no_block_cache = true;
  default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));

  one.compaction_style = kCompactionStyleUniversal;

  one.num_levels = 1;
  // trigger compaction if there are >= 4 files
  one.level0_file_num_compaction_trigger = 4;
  one.write_buffer_size = 120000;

  two.compaction_style = kCompactionStyleLevel;
  two.num_levels = 4;
  two.level0_file_num_compaction_trigger = 3;
  two.write_buffer_size = 100000;

  Reopen({default_cf, one, two});

  bool cf_1_1 = true;
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
      {{"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:1"},
       {"ColumnFamilyTest::AutoManual:2", "ColumnFamilyTest::AutoManual:5"},
       {"ColumnFamilyTest::AutoManual:2", "ColumnFamilyTest::AutoManual:3"}});
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
        if (cf_1_1) {
          cf_1_1 = false;
          TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:4");
          TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:3");
        }
      });
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
  // SETUP column family "one" -- universal style
  for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
    PutRandomData(1, 10, 12000, true);
    PutRandomData(1, 1, 10, true);
    WaitForFlush(1);
    AssertFilesPerLevel(ToString(i + 1), 1);
  }

  TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:1");

  // SETUP column family "two" -- level style with 4 levels
  for (int i = 0; i < two.level0_file_num_compaction_trigger - 2; ++i) {
    PutRandomData(2, 10, 12000);
    PutRandomData(2, 1, 10);
    WaitForFlush(2);
    AssertFilesPerLevel(ToString(i + 1), 2);
  }
  rocksdb::port::Thread threads([&] {
    CompactRangeOptions compact_options;
    compact_options.exclusive_manual_compaction = false;
    ASSERT_OK(
        db_->CompactRange(compact_options, handles_[2], nullptr, nullptr));
    TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:2");
  });

  TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:5");
  threads.join();

  // WAIT for compactions
  WaitForCompaction();

  // VERIFY compaction "one"
  AssertFilesPerLevel("1", 1);

  // VERIFY compaction "two"
  AssertFilesPerLevel("0,1", 2);
  CompactAll(2);
  AssertFilesPerLevel("0,1", 2);
  // Compare against saved keys
  std::set<std::string>::iterator key_iter = keys_.begin();
  while (key_iter != keys_.end()) {
    ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
    key_iter++;
  }
  Close();
}

TEST_F(ColumnFamilyTest, ManualAndAutomaticCompactions) {
  Open();
  CreateColumnFamilies({"one", "two"});
  ColumnFamilyOptions default_cf, one, two;
  db_options_.max_open_files = 20;  // only 10 files in file cache
  db_options_.max_background_compactions = 3;
  db_options_.base_background_compactions = 3;

  default_cf.compaction_style = kCompactionStyleLevel;
  default_cf.num_levels = 3;
  default_cf.write_buffer_size = 64 << 10;  // 64KB
  default_cf.target_file_size_base = 30 << 10;
  default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
  BlockBasedTableOptions table_options;
  table_options.no_block_cache = true;
  default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));

  one.compaction_style = kCompactionStyleUniversal;

  one.num_levels = 1;
  // trigger compaction if there are >= 4 files
  one.level0_file_num_compaction_trigger = 4;
  one.write_buffer_size = 120000;

  two.compaction_style = kCompactionStyleLevel;
  two.num_levels = 4;
  two.level0_file_num_compaction_trigger = 3;
  two.write_buffer_size = 100000;

  Reopen({default_cf, one, two});

  // SETUP column family "one" -- universal style
  for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
    PutRandomData(1, 10, 12000, true);
    PutRandomData(1, 1, 10, true);
    WaitForFlush(1);
    AssertFilesPerLevel(ToString(i + 1), 1);
  }
  bool cf_1_1 = true;
  bool cf_1_2 = true;
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
      {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:1"},
       {"ColumnFamilyTest::ManualAuto:5", "ColumnFamilyTest::ManualAuto:2"},
       {"ColumnFamilyTest::ManualAuto:2", "ColumnFamilyTest::ManualAuto:3"}});
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
        if (cf_1_1) {
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4");
          cf_1_1 = false;
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3");
        } else if (cf_1_2) {
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2");
          cf_1_2 = false;
        }
      });

  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
  rocksdb::port::Thread threads([&] {
    CompactRangeOptions compact_options;
    compact_options.exclusive_manual_compaction = false;
    ASSERT_OK(
        db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
  });

  TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1");

  // SETUP column family "two" -- level style with 4 levels
  for (int i = 0; i < two.level0_file_num_compaction_trigger; ++i) {
    PutRandomData(2, 10, 12000);
    PutRandomData(2, 1, 10);
    WaitForFlush(2);
    AssertFilesPerLevel(ToString(i + 1), 2);
  }
  TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5");
  threads.join();

  // WAIT for compactions
  WaitForCompaction();

  // VERIFY compaction "one"
  AssertFilesPerLevel("1", 1);

  // VERIFY compaction "two"
  AssertFilesPerLevel("0,1", 2);
  CompactAll(2);
  AssertFilesPerLevel("0,1", 2);
  // Compare against saved keys
  std::set<std::string>::iterator key_iter = keys_.begin();
  while (key_iter != keys_.end()) {
    ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
    key_iter++;
  }
  Close();
}

TEST_F(ColumnFamilyTest, SameCFManualManualCompactions) {
  Open();
  CreateColumnFamilies({"one"});
  ColumnFamilyOptions default_cf, one;
  db_options_.max_open_files = 20;  // only 10 files in file cache
  db_options_.max_background_compactions = 3;
  db_options_.base_background_compactions = 3;

  default_cf.compaction_style = kCompactionStyleLevel;
  default_cf.num_levels = 3;
  default_cf.write_buffer_size = 64 << 10;  // 64KB
  default_cf.target_file_size_base = 30 << 10;
  default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
  BlockBasedTableOptions table_options;
  table_options.no_block_cache = true;
  default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));

  one.compaction_style = kCompactionStyleUniversal;

  one.num_levels = 1;
  // trigger compaction if there are >= 4 files
  one.level0_file_num_compaction_trigger = 4;
  one.write_buffer_size = 120000;

  Reopen({default_cf, one});

  // SETUP column family "one" -- universal style
  for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
    PutRandomData(1, 10, 12000, true);
    PutRandomData(1, 1, 10, true);
    WaitForFlush(1);
    AssertFilesPerLevel(ToString(i + 1), 1);
  }
  bool cf_1_1 = true;
  bool cf_1_2 = true;
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
      {{"ColumnFamilyTest::ManualManual:4", "ColumnFamilyTest::ManualManual:2"},
       {"ColumnFamilyTest::ManualManual:4", "ColumnFamilyTest::ManualManual:5"},
       {"ColumnFamilyTest::ManualManual:1", "ColumnFamilyTest::ManualManual:2"},
       {"ColumnFamilyTest::ManualManual:1",
        "ColumnFamilyTest::ManualManual:3"}});
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
        if (cf_1_1) {
          TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:4");
          cf_1_1 = false;
          TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:3");
        } else if (cf_1_2) {
          TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:2");
          cf_1_2 = false;
        }
      });

  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
  rocksdb::port::Thread threads([&] {
    CompactRangeOptions compact_options;
    compact_options.exclusive_manual_compaction = true;
    ASSERT_OK(
        db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
  });

  TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:5");

  WaitForFlush(1);

  // Add more L0 files and force another manual compaction
  for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
    PutRandomData(1, 10, 12000, true);
    PutRandomData(1, 1, 10, true);
    WaitForFlush(1);
    AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
                        1);
  }

  rocksdb::port::Thread threads1([&] {
    CompactRangeOptions compact_options;
    compact_options.exclusive_manual_compaction = false;
    ASSERT_OK(
        db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
  });

  TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:1");

  threads.join();
  threads1.join();
  WaitForCompaction();
  // VERIFY compaction "one"
  ASSERT_LE(NumTableFilesAtLevel(0, 1), 2);

  // Compare against saved keys
  std::set<std::string>::iterator key_iter = keys_.begin();
  while (key_iter != keys_.end()) {
    ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
    key_iter++;
  }
  Close();
}

TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactions) {
  Open();
  CreateColumnFamilies({"one"});
  ColumnFamilyOptions default_cf, one;
  db_options_.max_open_files = 20;  // only 10 files in file cache
  db_options_.max_background_compactions = 3;
  db_options_.base_background_compactions = 3;

  default_cf.compaction_style = kCompactionStyleLevel;
  default_cf.num_levels = 3;
  default_cf.write_buffer_size = 64 << 10;  // 64KB
  default_cf.target_file_size_base = 30 << 10;
  default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
  BlockBasedTableOptions table_options;
  table_options.no_block_cache = true;
  default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));

  one.compaction_style = kCompactionStyleUniversal;

  one.num_levels = 1;
  // trigger compaction if there are >= 4 files
  one.level0_file_num_compaction_trigger = 4;
  one.write_buffer_size = 120000;

  Reopen({default_cf, one});

  // SETUP column family "one" -- universal style
  for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
    PutRandomData(1, 10, 12000, true);
    PutRandomData(1, 1, 10, true);
    WaitForFlush(1);
    AssertFilesPerLevel(ToString(i + 1), 1);
  }
  bool cf_1_1 = true;
  bool cf_1_2 = true;
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
      {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:2"},
       {"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:5"},
       {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:2"},
       {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:3"}});
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
        if (cf_1_1) {
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4");
          cf_1_1 = false;
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3");
        } else if (cf_1_2) {
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2");
          cf_1_2 = false;
        }
      });

  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
  rocksdb::port::Thread threads([&] {
    CompactRangeOptions compact_options;
    compact_options.exclusive_manual_compaction = false;
    ASSERT_OK(
        db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
  });

  TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5");

  WaitForFlush(1);

  // Add more L0 files and force automatic compaction
  for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
    PutRandomData(1, 10, 12000, true);
    PutRandomData(1, 1, 10, true);
    WaitForFlush(1);
    AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
                        1);
  }

  TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1");

  threads.join();
  WaitForCompaction();
  // VERIFY compaction "one"
  ASSERT_LE(NumTableFilesAtLevel(0, 1), 2);

  // Compare against saved keys
  std::set<std::string>::iterator key_iter = keys_.begin();
  while (key_iter != keys_.end()) {
    ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
    key_iter++;
  }
  Close();
}

TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactionsLevel) {
  Open();
  CreateColumnFamilies({"one"});
  ColumnFamilyOptions default_cf, one;
  db_options_.max_open_files = 20;  // only 10 files in file cache
  db_options_.max_background_compactions = 3;
  db_options_.base_background_compactions = 3;

  default_cf.compaction_style = kCompactionStyleLevel;
  default_cf.num_levels = 3;
  default_cf.write_buffer_size = 64 << 10;  // 64KB
  default_cf.target_file_size_base = 30 << 10;
  default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
  BlockBasedTableOptions table_options;
  table_options.no_block_cache = true;
  default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));

  one.compaction_style = kCompactionStyleLevel;

  one.num_levels = 1;
  // trigger compaction if there are >= 4 files
  one.level0_file_num_compaction_trigger = 4;
  one.write_buffer_size = 120000;

  Reopen({default_cf, one});

  // SETUP column family "one" -- level style
  for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
    PutRandomData(1, 10, 12000, true);
    PutRandomData(1, 1, 10, true);
    WaitForFlush(1);
    AssertFilesPerLevel(ToString(i + 1), 1);
  }
  bool cf_1_1 = true;
  bool cf_1_2 = true;
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
      {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:2"},
       {"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:5"},
       {"ColumnFamilyTest::ManualAuto:3", "ColumnFamilyTest::ManualAuto:2"},
       {"LevelCompactionPicker::PickCompactionBySize:0",
        "ColumnFamilyTest::ManualAuto:3"},
       {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:3"}});
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
        if (cf_1_1) {
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4");
          cf_1_1 = false;
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3");
        } else if (cf_1_2) {
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2");
          cf_1_2 = false;
        }
      });

  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
  rocksdb::port::Thread threads([&] {
    CompactRangeOptions compact_options;
    compact_options.exclusive_manual_compaction = false;
    ASSERT_OK(
        db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
  });

  TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5");

  // Add more L0 files and force automatic compaction
  for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
    PutRandomData(1, 10, 12000, true);
    PutRandomData(1, 1, 10, true);
    WaitForFlush(1);
    AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
                        1);
  }

  TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1");

  threads.join();
  WaitForCompaction();
  // VERIFY compaction "one"
  AssertFilesPerLevel("0,1", 1);

  // Compare against saved keys
  std::set<std::string>::iterator key_iter = keys_.begin();
  while (key_iter != keys_.end()) {
    ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
    key_iter++;
  }
  Close();
}

// This test checks for automatic getting a conflict if there is a
// manual which has not yet been scheduled.
// The manual compaction waits in NotScheduled
// We generate more files and then trigger an automatic compaction
// This will wait because there is an unscheduled manual compaction.
// Once the conflict is hit, the manual compaction starts and ends
// Then another automatic will start and end.
TEST_F(ColumnFamilyTest, SameCFManualAutomaticConflict) {
  Open();
  CreateColumnFamilies({"one"});
  ColumnFamilyOptions default_cf, one;
  db_options_.max_open_files = 20;  // only 10 files in file cache
  db_options_.max_background_compactions = 3;
  db_options_.base_background_compactions = 3;

  default_cf.compaction_style = kCompactionStyleLevel;
  default_cf.num_levels = 3;
  default_cf.write_buffer_size = 64 << 10;  // 64KB
  default_cf.target_file_size_base = 30 << 10;
  default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
  BlockBasedTableOptions table_options;
  table_options.no_block_cache = true;
  default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));

  one.compaction_style = kCompactionStyleUniversal;

  one.num_levels = 1;
  // trigger compaction if there are >= 4 files
  one.level0_file_num_compaction_trigger = 4;
  one.write_buffer_size = 120000;

  Reopen({default_cf, one});

  // SETUP column family "one" -- universal style
  for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
    PutRandomData(1, 10, 12000, true);
    PutRandomData(1, 1, 10, true);
    WaitForFlush(1);
    AssertFilesPerLevel(ToString(i + 1), 1);
  }
  bool cf_1_1 = true;
  bool cf_1_2 = true;
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
      {{"DBImpl::BackgroundCompaction()::Conflict",
        "ColumnFamilyTest::ManualAutoCon:7"},
       {"ColumnFamilyTest::ManualAutoCon:9",
        "ColumnFamilyTest::ManualAutoCon:8"},
       {"ColumnFamilyTest::ManualAutoCon:2",
        "ColumnFamilyTest::ManualAutoCon:6"},
       {"ColumnFamilyTest::ManualAutoCon:4",
        "ColumnFamilyTest::ManualAutoCon:5"},
       {"ColumnFamilyTest::ManualAutoCon:1",
        "ColumnFamilyTest::ManualAutoCon:2"},
       {"ColumnFamilyTest::ManualAutoCon:1",
        "ColumnFamilyTest::ManualAutoCon:3"}});
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
        if (cf_1_1) {
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:4");
          cf_1_1 = false;
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:3");
        } else if (cf_1_2) {
          cf_1_2 = false;
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:2");
        }
      });
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
      "DBImpl::RunManualCompaction:NotScheduled", [&](void* arg) {
        InstrumentedMutex* mutex = static_cast<InstrumentedMutex*>(arg);
        mutex->Unlock();
        TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:9");
        TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:7");
        mutex->Lock();
      });

  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
  rocksdb::port::Thread threads([&] {
    CompactRangeOptions compact_options;
    compact_options.exclusive_manual_compaction = false;
    ASSERT_OK(
        db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
    TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:6");
  });

  TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:8");
  WaitForFlush(1);

  // Add more L0 files and force automatic compaction
  for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
    PutRandomData(1, 10, 12000, true);
    PutRandomData(1, 1, 10, true);
    WaitForFlush(1);
    AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
                        1);
  }

  TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:5");
  // Add more L0 files and force automatic compaction
  for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
    PutRandomData(1, 10, 12000, true);
    PutRandomData(1, 1, 10, true);
    WaitForFlush(1);
  }
  TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:1");

  threads.join();
  WaitForCompaction();
  // VERIFY compaction "one"
  ASSERT_LE(NumTableFilesAtLevel(0, 1), 3);

  // Compare against saved keys
  std::set<std::string>::iterator key_iter = keys_.begin();
  while (key_iter != keys_.end()) {
    ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
    key_iter++;
  }

  Close();
}

// In this test, we generate enough files to trigger automatic compactions.
// The automatic compaction waits in NonTrivial:AfterRun
// We generate more files and then trigger an automatic compaction
// This will wait because the automatic compaction has files it needs.
// Once the conflict is hit, the automatic compaction starts and ends
// Then the manual will run and end.
TEST_F(ColumnFamilyTest, SameCFAutomaticManualCompactions) {
  Open();
  CreateColumnFamilies({"one"});
  ColumnFamilyOptions default_cf, one;
  db_options_.max_open_files = 20;  // only 10 files in file cache
  db_options_.max_background_compactions = 3;
  db_options_.base_background_compactions = 3;

  default_cf.compaction_style = kCompactionStyleLevel;
  default_cf.num_levels = 3;
  default_cf.write_buffer_size = 64 << 10;  // 64KB
  default_cf.target_file_size_base = 30 << 10;
  default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
  BlockBasedTableOptions table_options;
  table_options.no_block_cache = true;
  default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));

  one.compaction_style = kCompactionStyleUniversal;

  one.num_levels = 1;
  // trigger compaction if there are >= 4 files
  one.level0_file_num_compaction_trigger = 4;
  one.write_buffer_size = 120000;

  Reopen({default_cf, one});

  bool cf_1_1 = true;
  bool cf_1_2 = true;
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
      {{"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:2"},
       {"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:5"},
       {"CompactionPicker::CompactRange:Conflict",
        "ColumnFamilyTest::AutoManual:3"}});
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
        if (cf_1_1) {
          TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:4");
          cf_1_1 = false;
          TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:3");
        } else if (cf_1_2) {
          TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:2");
          cf_1_2 = false;
        }
      });

  rocksdb::SyncPoint::GetInstance()->EnableProcessing();

  // SETUP column family "one" -- universal style
  for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
    PutRandomData(1, 10, 12000, true);
    PutRandomData(1, 1, 10, true);
    WaitForFlush(1);
    AssertFilesPerLevel(ToString(i + 1), 1);
  }

  TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:5");

  // Add another L0 file and force automatic compaction
  for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
    PutRandomData(1, 10, 12000, true);
    PutRandomData(1, 1, 10, true);
    WaitForFlush(1);
  }

  CompactRangeOptions compact_options;
  compact_options.exclusive_manual_compaction = false;
  ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));

  TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:1");

  WaitForCompaction();
  // VERIFY compaction "one"
  AssertFilesPerLevel("1", 1);
  // Compare against saved keys
  std::set<std::string>::iterator key_iter = keys_.begin();
  while (key_iter != keys_.end()) {
    ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
    key_iter++;
  }

  Close();
}
#endif  // !ROCKSDB_LITE

#ifndef ROCKSDB_LITE  // Tailing interator not supported
namespace {
std::string IterStatus(Iterator* iter) {
  std::string result;
  if (iter->Valid()) {
    result = iter->key().ToString() + "->" + iter->value().ToString();
  } else {
    result = "(invalid)";
  }
  return result;
}
}  // anonymous namespace

TEST_F(ColumnFamilyTest, NewIteratorsTest) {
  // iter == 0 -- no tailing
  // iter == 2 -- tailing
  for (int iter = 0; iter < 2; ++iter) {
    Open();
    CreateColumnFamiliesAndReopen({"one", "two"});
    ASSERT_OK(Put(0, "a", "b"));
    ASSERT_OK(Put(1, "b", "a"));
    ASSERT_OK(Put(2, "c", "m"));
    ASSERT_OK(Put(2, "v", "t"));
    std::vector<Iterator*> iterators;
    ReadOptions options;
    options.tailing = (iter == 1);
    ASSERT_OK(db_->NewIterators(options, handles_, &iterators));

    for (auto it : iterators) {
      it->SeekToFirst();
    }
    ASSERT_EQ(IterStatus(iterators[0]), "a->b");
    ASSERT_EQ(IterStatus(iterators[1]), "b->a");
    ASSERT_EQ(IterStatus(iterators[2]), "c->m");

    ASSERT_OK(Put(1, "x", "x"));

    for (auto it : iterators) {
      it->Next();
    }

    ASSERT_EQ(IterStatus(iterators[0]), "(invalid)");
    if (iter == 0) {
      // no tailing
      ASSERT_EQ(IterStatus(iterators[1]), "(invalid)");
    } else {
      // tailing
      ASSERT_EQ(IterStatus(iterators[1]), "x->x");
    }
    ASSERT_EQ(IterStatus(iterators[2]), "v->t");

    for (auto it : iterators) {
      delete it;
    }
    Destroy();
  }
}
#endif  // !ROCKSDB_LITE

#ifndef ROCKSDB_LITE  // ReadOnlyDB is not supported
TEST_F(ColumnFamilyTest, ReadOnlyDBTest) {
  Open();
  CreateColumnFamiliesAndReopen({"one", "two", "three", "four"});
  ASSERT_OK(Put(0, "a", "b"));
  ASSERT_OK(Put(1, "foo", "bla"));
  ASSERT_OK(Put(2, "foo", "blabla"));
  ASSERT_OK(Put(3, "foo", "blablabla"));
  ASSERT_OK(Put(4, "foo", "blablablabla"));

  DropColumnFamilies({2});
  Close();
  // open only a subset of column families
  AssertOpenReadOnly({"default", "one", "four"});
  ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
  ASSERT_EQ("bla", Get(1, "foo"));
  ASSERT_EQ("blablablabla", Get(2, "foo"));


  // test newiterators
  {
    std::vector<Iterator*> iterators;
    ASSERT_OK(db_->NewIterators(ReadOptions(), handles_, &iterators));
    for (auto it : iterators) {
      it->SeekToFirst();
    }
    ASSERT_EQ(IterStatus(iterators[0]), "a->b");
    ASSERT_EQ(IterStatus(iterators[1]), "foo->bla");
    ASSERT_EQ(IterStatus(iterators[2]), "foo->blablablabla");
    for (auto it : iterators) {
      it->Next();
    }
    ASSERT_EQ(IterStatus(iterators[0]), "(invalid)");
    ASSERT_EQ(IterStatus(iterators[1]), "(invalid)");
    ASSERT_EQ(IterStatus(iterators[2]), "(invalid)");

    for (auto it : iterators) {
      delete it;
    }
  }

  Close();
  // can't open dropped column family
  Status s = OpenReadOnly({"default", "one", "two"});
  ASSERT_TRUE(!s.ok());

  // Can't open without specifying default column family
  s = OpenReadOnly({"one", "four"});
  ASSERT_TRUE(!s.ok());
}
#endif  // !ROCKSDB_LITE

#ifndef ROCKSDB_LITE  //  WaitForFlush() is not supported in lite
TEST_F(ColumnFamilyTest, DontRollEmptyLogs) {
  Open();
  CreateColumnFamiliesAndReopen({"one", "two", "three", "four"});

  for (size_t i = 0; i < handles_.size(); ++i) {
    PutRandomData(static_cast<int>(i), 10, 100);
  }
  int num_writable_file_start = env_->GetNumberOfNewWritableFileCalls();
  // this will trigger the flushes
  for (int i = 0; i <= 4; ++i) {
    ASSERT_OK(Flush(i));
  }

  for (int i = 0; i < 4; ++i) {
    WaitForFlush(i);
  }
  int total_new_writable_files =
      env_->GetNumberOfNewWritableFileCalls() - num_writable_file_start;
  ASSERT_EQ(static_cast<size_t>(total_new_writable_files), handles_.size() + 1);
  Close();
}
#endif  // !ROCKSDB_LITE

#ifndef ROCKSDB_LITE  //  WaitForCompaction() is not supported in lite
TEST_F(ColumnFamilyTest, FlushStaleColumnFamilies) {
  Open();
  CreateColumnFamilies({"one", "two"});
  ColumnFamilyOptions default_cf, one, two;
  default_cf.write_buffer_size = 100000;  // small write buffer size
  default_cf.arena_block_size = 4096;
  default_cf.disable_auto_compactions = true;
  one.disable_auto_compactions = true;
  two.disable_auto_compactions = true;
  db_options_.max_total_wal_size = 210000;

  Reopen({default_cf, one, two});

  PutRandomData(2, 1, 10);  // 10 bytes
  for (int i = 0; i < 2; ++i) {
    PutRandomData(0, 100, 1000);  // flush
    WaitForFlush(0);

    AssertCountLiveFiles(i + 1);
  }
  // third flush. now, CF [two] should be detected as stale and flushed
  // column family 1 should not be flushed since it's empty
  PutRandomData(0, 100, 1000);  // flush
  WaitForFlush(0);
  WaitForFlush(2);
  // 3 files for default column families, 1 file for column family [two], zero
  // files for column family [one], because it's empty
  AssertCountLiveFiles(4);

  Flush(0);
  ASSERT_EQ(0, dbfull()->TEST_total_log_size());
  Close();
}
#endif  // !ROCKSDB_LITE

TEST_F(ColumnFamilyTest, CreateMissingColumnFamilies) {
  Status s = TryOpen({"one", "two"});
  ASSERT_TRUE(!s.ok());
  db_options_.create_missing_column_families = true;
  s = TryOpen({"default", "one", "two"});
  ASSERT_TRUE(s.ok());
  Close();
}

TEST_F(ColumnFamilyTest, SanitizeOptions) {
  DBOptions db_options;
  for (int s = kCompactionStyleLevel; s <= kCompactionStyleUniversal; ++s) {
    for (int l = 0; l <= 2; l++) {
      for (int i = 1; i <= 3; i++) {
        for (int j = 1; j <= 3; j++) {
          for (int k = 1; k <= 3; k++) {
            ColumnFamilyOptions original;
            original.compaction_style = static_cast<CompactionStyle>(s);
            original.num_levels = l;
            original.level0_stop_writes_trigger = i;
            original.level0_slowdown_writes_trigger = j;
            original.level0_file_num_compaction_trigger = k;
            original.write_buffer_size =
                l * 4 * 1024 * 1024 + i * 1024 * 1024 + j * 1024 + k;

            ColumnFamilyOptions result =
                SanitizeOptions(ImmutableDBOptions(db_options), original);
            ASSERT_TRUE(result.level0_stop_writes_trigger >=
                        result.level0_slowdown_writes_trigger);
            ASSERT_TRUE(result.level0_slowdown_writes_trigger >=
                        result.level0_file_num_compaction_trigger);
            ASSERT_TRUE(result.level0_file_num_compaction_trigger ==
                        original.level0_file_num_compaction_trigger);
            if (s == kCompactionStyleLevel) {
              ASSERT_GE(result.num_levels, 2);
            } else {
              ASSERT_GE(result.num_levels, 1);
              if (original.num_levels >= 1) {
                ASSERT_EQ(result.num_levels, original.num_levels);
              }
            }

            // Make sure Sanitize options sets arena_block_size to 1/8 of
            // the write_buffer_size, rounded up to a multiple of 4k.
            size_t expected_arena_block_size =
                l * 4 * 1024 * 1024 / 8 + i * 1024 * 1024 / 8;
            if (j + k != 0) {
              // not a multiple of 4k, round up 4k
              expected_arena_block_size += 4 * 1024;
            }
            ASSERT_EQ(expected_arena_block_size, result.arena_block_size);
          }
        }
      }
    }
  }
}

TEST_F(ColumnFamilyTest, ReadDroppedColumnFamily) {
  // iter 0 -- drop CF, don't reopen
  // iter 1 -- delete CF, reopen
  for (int iter = 0; iter < 2; ++iter) {
    db_options_.create_missing_column_families = true;
    db_options_.max_open_files = 20;
    // delete obsolete files always
    db_options_.delete_obsolete_files_period_micros = 0;
    Open({"default", "one", "two"});
    ColumnFamilyOptions options;
    options.level0_file_num_compaction_trigger = 100;
    options.level0_slowdown_writes_trigger = 200;
    options.level0_stop_writes_trigger = 200;
    options.write_buffer_size = 100000;  // small write buffer size
    Reopen({options, options, options});

    // 1MB should create ~10 files for each CF
    int kKeysNum = 10000;
    PutRandomData(0, kKeysNum, 100);
    PutRandomData(1, kKeysNum, 100);
    PutRandomData(2, kKeysNum, 100);

    {
      std::unique_ptr<Iterator> iterator(
          db_->NewIterator(ReadOptions(), handles_[2]));
      iterator->SeekToFirst();

      if (iter == 0) {
        // Drop CF two
        ASSERT_OK(db_->DropColumnFamily(handles_[2]));
      } else {
        // delete CF two
        db_->DestroyColumnFamilyHandle(handles_[2]);
        handles_[2] = nullptr;
      }
      // Make sure iterator created can still be used.
      int count = 0;
      for (; iterator->Valid(); iterator->Next()) {
        ASSERT_OK(iterator->status());
        ++count;
      }
      ASSERT_OK(iterator->status());
      ASSERT_EQ(count, kKeysNum);
    }

    // Add bunch more data to other CFs
    PutRandomData(0, kKeysNum, 100);
    PutRandomData(1, kKeysNum, 100);

    if (iter == 1) {
      Reopen();
    }

    // Since we didn't delete CF handle, RocksDB's contract guarantees that
    // we're still able to read dropped CF
    for (int i = 0; i < 3; ++i) {
      std::unique_ptr<Iterator> iterator(
          db_->NewIterator(ReadOptions(), handles_[i]));
      int count = 0;
      for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
        ASSERT_OK(iterator->status());
        ++count;
      }
      ASSERT_OK(iterator->status());
      ASSERT_EQ(count, kKeysNum * ((i == 2) ? 1 : 2));
    }

    Close();
    Destroy();
  }
}

TEST_F(ColumnFamilyTest, FlushAndDropRaceCondition) {
  db_options_.create_missing_column_families = true;
  Open({"default", "one"});
  ColumnFamilyOptions options;
  options.level0_file_num_compaction_trigger = 100;
  options.level0_slowdown_writes_trigger = 200;
  options.level0_stop_writes_trigger = 200;
  options.max_write_buffer_number = 20;
  options.write_buffer_size = 100000;  // small write buffer size
  Reopen({options, options});

  rocksdb::SyncPoint::GetInstance()->LoadDependency(
      {{"VersionSet::LogAndApply::ColumnFamilyDrop:0",
        "FlushJob::WriteLevel0Table"},
       {"VersionSet::LogAndApply::ColumnFamilyDrop:1",
        "FlushJob::InstallResults"},
       {"FlushJob::InstallResults",
        "VersionSet::LogAndApply::ColumnFamilyDrop:2"}});

  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
  test::SleepingBackgroundTask sleeping_task;

  env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
                 Env::Priority::HIGH);

  // 1MB should create ~10 files for each CF
  int kKeysNum = 10000;
  PutRandomData(1, kKeysNum, 100);

  std::vector<port::Thread> threads;
  threads.emplace_back([&] { ASSERT_OK(db_->DropColumnFamily(handles_[1])); });

  sleeping_task.WakeUp();
  sleeping_task.WaitUntilDone();
  sleeping_task.Reset();
  // now we sleep again. this is just so we're certain that flush job finished
  env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
                 Env::Priority::HIGH);
  sleeping_task.WakeUp();
  sleeping_task.WaitUntilDone();

  {
    // Since we didn't delete CF handle, RocksDB's contract guarantees that
    // we're still able to read dropped CF
    std::unique_ptr<Iterator> iterator(
        db_->NewIterator(ReadOptions(), handles_[1]));
    int count = 0;
    for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
      ASSERT_OK(iterator->status());
      ++count;
    }
    ASSERT_OK(iterator->status());
    ASSERT_EQ(count, kKeysNum);
  }
  for (auto& t : threads) {
    t.join();
  }

  Close();
  Destroy();
}

#ifndef ROCKSDB_LITE
// skipped as persisting options is not supported in ROCKSDB_LITE
namespace {
std::atomic<int> test_stage(0);
const int kMainThreadStartPersistingOptionsFile = 1;
const int kChildThreadFinishDroppingColumnFamily = 2;
const int kChildThreadWaitingMainThreadPersistOptions = 3;
void DropSingleColumnFamily(ColumnFamilyTest* cf_test, int cf_id,
                            std::vector<Comparator*>* comparators) {
  while (test_stage < kMainThreadStartPersistingOptionsFile) {
    Env::Default()->SleepForMicroseconds(100);
  }
  cf_test->DropColumnFamilies({cf_id});
  if ((*comparators)[cf_id]) {
    delete (*comparators)[cf_id];
    (*comparators)[cf_id] = nullptr;
  }
  test_stage = kChildThreadFinishDroppingColumnFamily;
}
}  // namespace

TEST_F(ColumnFamilyTest, CreateAndDropRace) {
  const int kCfCount = 5;
  std::vector<ColumnFamilyOptions> cf_opts;
  std::vector<Comparator*> comparators;
  for (int i = 0; i < kCfCount; ++i) {
    cf_opts.emplace_back();
    comparators.push_back(new test::SimpleSuffixReverseComparator());
    cf_opts.back().comparator = comparators.back();
  }
  db_options_.create_if_missing = true;
  db_options_.create_missing_column_families = true;

  auto main_thread_id = std::this_thread::get_id();

  rocksdb::SyncPoint::GetInstance()->SetCallBack("PersistRocksDBOptions:start",
                                                 [&](void* arg) {
    auto current_thread_id = std::this_thread::get_id();
    // If it's the main thread hitting this sync-point, then it
    // will be blocked until some other thread update the test_stage.
    if (main_thread_id == current_thread_id) {
      test_stage = kMainThreadStartPersistingOptionsFile;
      while (test_stage < kChildThreadFinishDroppingColumnFamily) {
        Env::Default()->SleepForMicroseconds(100);
      }
    }
  });

  rocksdb::SyncPoint::GetInstance()->SetCallBack(
      "WriteThread::EnterUnbatched:Wait", [&](void* arg) {
        // This means a thread doing DropColumnFamily() is waiting for
        // other thread to finish persisting options.
        // In such case, we update the test_stage to unblock the main thread.
        test_stage = kChildThreadWaitingMainThreadPersistOptions;

        // Note that based on the test setting, this must not be the
        // main thread.
        ASSERT_NE(main_thread_id, std::this_thread::get_id());
      });

  // Create a database with four column families
  Open({"default", "one", "two", "three"},
       {cf_opts[0], cf_opts[1], cf_opts[2], cf_opts[3]});

  rocksdb::SyncPoint::GetInstance()->EnableProcessing();

  // Start a thread that will drop the first column family
  // and its comparator
  rocksdb::port::Thread drop_cf_thread(DropSingleColumnFamily, this, 1, &comparators);

  DropColumnFamilies({2});

  drop_cf_thread.join();
  Close();
  Destroy();
  for (auto* comparator : comparators) {
    if (comparator) {
      delete comparator;
    }
  }
}
#endif  // !ROCKSDB_LITE

TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
  const uint64_t kBaseRate = 800000u;
  db_options_.delayed_write_rate = kBaseRate;
  db_options_.base_background_compactions = 2;
  db_options_.max_background_compactions = 6;

  Open({"default"});
  ColumnFamilyData* cfd =
      static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();

  VersionStorageInfo* vstorage = cfd->current()->storage_info();

  MutableCFOptions mutable_cf_options(column_family_options_);

  mutable_cf_options.level0_slowdown_writes_trigger = 20;
  mutable_cf_options.level0_stop_writes_trigger = 10000;
  mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
  mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
  mutable_cf_options.disable_auto_compactions = false;

  vstorage->TEST_set_estimated_compaction_needed_bytes(50);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());

  vstorage->TEST_set_estimated_compaction_needed_bytes(201);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
  ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());

  vstorage->TEST_set_estimated_compaction_needed_bytes(400);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate / 1.25,
            dbfull()->TEST_write_controler().delayed_write_rate());
  ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());

  vstorage->TEST_set_estimated_compaction_needed_bytes(500);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate / 1.25 / 1.25,
            dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage->TEST_set_estimated_compaction_needed_bytes(450);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate / 1.25,
            dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage->TEST_set_estimated_compaction_needed_bytes(205);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage->TEST_set_estimated_compaction_needed_bytes(202);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage->TEST_set_estimated_compaction_needed_bytes(201);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage->TEST_set_estimated_compaction_needed_bytes(198);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());

  vstorage->TEST_set_estimated_compaction_needed_bytes(399);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage->TEST_set_estimated_compaction_needed_bytes(599);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate / 1.25,
            dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage->TEST_set_estimated_compaction_needed_bytes(2001);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());

  vstorage->TEST_set_estimated_compaction_needed_bytes(3001);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());

  vstorage->TEST_set_estimated_compaction_needed_bytes(390);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate / 1.25,
            dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage->TEST_set_estimated_compaction_needed_bytes(100);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());

  vstorage->set_l0_delay_trigger_count(100);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
  ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());

  vstorage->set_l0_delay_trigger_count(101);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate / 1.25,
            dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage->set_l0_delay_trigger_count(0);
  vstorage->TEST_set_estimated_compaction_needed_bytes(300);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate / 1.25 / 1.25,
            dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage->set_l0_delay_trigger_count(101);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate / 1.25 / 1.25 / 1.25,
            dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage->TEST_set_estimated_compaction_needed_bytes(200);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate / 1.25 / 1.25,
            dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage->set_l0_delay_trigger_count(0);
  vstorage->TEST_set_estimated_compaction_needed_bytes(0);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());

  mutable_cf_options.disable_auto_compactions = true;
  dbfull()->TEST_write_controler().set_delayed_write_rate(kBaseRate);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());

  vstorage->set_l0_delay_trigger_count(50);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage->set_l0_delay_trigger_count(60);
  vstorage->TEST_set_estimated_compaction_needed_bytes(300);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());

  mutable_cf_options.disable_auto_compactions = false;
  vstorage->set_l0_delay_trigger_count(70);
  vstorage->TEST_set_estimated_compaction_needed_bytes(500);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage->set_l0_delay_trigger_count(71);
  vstorage->TEST_set_estimated_compaction_needed_bytes(501);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate / 1.25,
            dbfull()->TEST_write_controler().delayed_write_rate());
}

TEST_F(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) {
  db_options_.base_background_compactions = 2;
  db_options_.max_background_compactions = 6;
  Open({"default"});
  ColumnFamilyData* cfd =
      static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();

  VersionStorageInfo* vstorage = cfd->current()->storage_info();

  MutableCFOptions mutable_cf_options(column_family_options_);

  // Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8
  mutable_cf_options.level0_file_num_compaction_trigger = 4;
  mutable_cf_options.level0_slowdown_writes_trigger = 36;
  mutable_cf_options.level0_stop_writes_trigger = 50;
  // Speedup threshold = 200 / 4 = 50
  mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
  mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;

  vstorage->TEST_set_estimated_compaction_needed_bytes(40);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());

  vstorage->TEST_set_estimated_compaction_needed_bytes(50);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());

  vstorage->TEST_set_estimated_compaction_needed_bytes(300);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());

  vstorage->TEST_set_estimated_compaction_needed_bytes(45);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());

  vstorage->set_l0_delay_trigger_count(7);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());

  vstorage->set_l0_delay_trigger_count(9);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());

  vstorage->set_l0_delay_trigger_count(6);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());

  // Speed up threshold = min(4 * 2, 4 + (12 - 4)/4) = 6
  mutable_cf_options.level0_file_num_compaction_trigger = 4;
  mutable_cf_options.level0_slowdown_writes_trigger = 16;
  mutable_cf_options.level0_stop_writes_trigger = 30;

  vstorage->set_l0_delay_trigger_count(5);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());

  vstorage->set_l0_delay_trigger_count(7);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());

  vstorage->set_l0_delay_trigger_count(3);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());
}

TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) {
  const uint64_t kBaseRate = 810000u;
  db_options_.delayed_write_rate = kBaseRate;
  Open();
  CreateColumnFamilies({"one"});
  ColumnFamilyData* cfd =
      static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
  VersionStorageInfo* vstorage = cfd->current()->storage_info();

  ColumnFamilyData* cfd1 =
      static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
  VersionStorageInfo* vstorage1 = cfd1->current()->storage_info();

  MutableCFOptions mutable_cf_options(column_family_options_);
  mutable_cf_options.level0_slowdown_writes_trigger = 20;
  mutable_cf_options.level0_stop_writes_trigger = 10000;
  mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
  mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;

  MutableCFOptions mutable_cf_options1 = mutable_cf_options;
  mutable_cf_options1.soft_pending_compaction_bytes_limit = 500;

  vstorage->TEST_set_estimated_compaction_needed_bytes(50);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());

  vstorage1->TEST_set_estimated_compaction_needed_bytes(201);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());

  vstorage1->TEST_set_estimated_compaction_needed_bytes(600);
  cfd1->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage->TEST_set_estimated_compaction_needed_bytes(70);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage1->TEST_set_estimated_compaction_needed_bytes(800);
  cfd1->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate / 1.25,
            dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage->TEST_set_estimated_compaction_needed_bytes(300);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate / 1.25 / 1.25,
            dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage1->TEST_set_estimated_compaction_needed_bytes(700);
  cfd1->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate / 1.25,
            dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage->TEST_set_estimated_compaction_needed_bytes(500);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate / 1.25 / 1.25,
            dbfull()->TEST_write_controler().delayed_write_rate());

  vstorage1->TEST_set_estimated_compaction_needed_bytes(600);
  cfd1->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
  ASSERT_EQ(kBaseRate / 1.25,
            dbfull()->TEST_write_controler().delayed_write_rate());
}

TEST_F(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) {
  db_options_.base_background_compactions = 2;
  db_options_.max_background_compactions = 6;
  column_family_options_.soft_pending_compaction_bytes_limit = 200;
  column_family_options_.hard_pending_compaction_bytes_limit = 2000;
  Open();
  CreateColumnFamilies({"one"});
  ColumnFamilyData* cfd =
      static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
  VersionStorageInfo* vstorage = cfd->current()->storage_info();

  ColumnFamilyData* cfd1 =
      static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
  VersionStorageInfo* vstorage1 = cfd1->current()->storage_info();

  MutableCFOptions mutable_cf_options(column_family_options_);
  // Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8
  mutable_cf_options.level0_file_num_compaction_trigger = 4;
  mutable_cf_options.level0_slowdown_writes_trigger = 36;
  mutable_cf_options.level0_stop_writes_trigger = 30;
  // Speedup threshold = 200 / 4 = 50
  mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
  mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;

  MutableCFOptions mutable_cf_options1 = mutable_cf_options;
  mutable_cf_options1.level0_slowdown_writes_trigger = 16;

  vstorage->TEST_set_estimated_compaction_needed_bytes(40);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());

  vstorage->TEST_set_estimated_compaction_needed_bytes(60);
  cfd1->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());

  vstorage1->TEST_set_estimated_compaction_needed_bytes(30);
  cfd1->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());

  vstorage1->TEST_set_estimated_compaction_needed_bytes(70);
  cfd1->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());

  vstorage->TEST_set_estimated_compaction_needed_bytes(20);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());

  vstorage1->TEST_set_estimated_compaction_needed_bytes(3);
  cfd1->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());

  vstorage->set_l0_delay_trigger_count(9);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());

  vstorage1->set_l0_delay_trigger_count(2);
  cfd1->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());

  vstorage->set_l0_delay_trigger_count(0);
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
  ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());
}

#ifndef ROCKSDB_LITE
TEST_F(ColumnFamilyTest, FlushCloseWALFiles) {
  SpecialEnv env(Env::Default());
  db_options_.env = &env;
  db_options_.max_background_flushes = 1;
  column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2));
  Open();
  CreateColumnFamilies({"one"});
  ASSERT_OK(Put(1, "fodor", "mirko"));
  ASSERT_OK(Put(0, "fodor", "mirko"));
  ASSERT_OK(Put(1, "fodor", "mirko"));

  rocksdb::SyncPoint::GetInstance()->LoadDependency({
      {"DBImpl::BGWorkFlush:done", "FlushCloseWALFiles:0"},
  });
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();

  // Block flush jobs from running
  test::SleepingBackgroundTask sleeping_task;
  env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
                 Env::Priority::HIGH);

  WriteOptions wo;
  wo.sync = true;
  ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));

  ASSERT_EQ(2, env.num_open_wal_file_.load());

  sleeping_task.WakeUp();
  sleeping_task.WaitUntilDone();
  TEST_SYNC_POINT("FlushCloseWALFiles:0");
  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
  ASSERT_EQ(1, env.num_open_wal_file_.load());

  Reopen();
  ASSERT_EQ("mirko", Get(0, "fodor"));
  ASSERT_EQ("mirko", Get(1, "fodor"));
  db_options_.env = env_;
  Close();
}
#endif  // !ROCKSDB_LITE

#ifndef ROCKSDB_LITE  // WaitForFlush() is not supported
TEST_F(ColumnFamilyTest, IteratorCloseWALFile1) {
  SpecialEnv env(Env::Default());
  db_options_.env = &env;
  db_options_.max_background_flushes = 1;
  column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2));
  Open();
  CreateColumnFamilies({"one"});
  ASSERT_OK(Put(1, "fodor", "mirko"));
  // Create an iterator holding the current super version.
  Iterator* it = db_->NewIterator(ReadOptions(), handles_[1]);
  // A flush will make `it` hold the last reference of its super version.
  Flush(1);

  ASSERT_OK(Put(1, "fodor", "mirko"));
  ASSERT_OK(Put(0, "fodor", "mirko"));
  ASSERT_OK(Put(1, "fodor", "mirko"));

  // Flush jobs will close previous WAL files after finishing. By
  // block flush jobs from running, we trigger a condition where
  // the iterator destructor should close the WAL files.
  test::SleepingBackgroundTask sleeping_task;
  env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
                 Env::Priority::HIGH);

  WriteOptions wo;
  wo.sync = true;
  ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));

  ASSERT_EQ(2, env.num_open_wal_file_.load());
  // Deleting the iterator will clear its super version, triggering
  // closing all files
  delete it;
  ASSERT_EQ(1, env.num_open_wal_file_.load());

  sleeping_task.WakeUp();
  sleeping_task.WaitUntilDone();
  WaitForFlush(1);

  Reopen();
  ASSERT_EQ("mirko", Get(0, "fodor"));
  ASSERT_EQ("mirko", Get(1, "fodor"));
  db_options_.env = env_;
  Close();
}

TEST_F(ColumnFamilyTest, IteratorCloseWALFile2) {
  SpecialEnv env(Env::Default());
  // Allow both of flush and purge job to schedule.
  env.SetBackgroundThreads(2, Env::HIGH);
  db_options_.env = &env;
  db_options_.max_background_flushes = 1;
  column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2));
  Open();
  CreateColumnFamilies({"one"});
  ASSERT_OK(Put(1, "fodor", "mirko"));
  // Create an iterator holding the current super version.
  ReadOptions ro;
  ro.background_purge_on_iterator_cleanup = true;
  Iterator* it = db_->NewIterator(ro, handles_[1]);
  // A flush will make `it` hold the last reference of its super version.
  Flush(1);

  ASSERT_OK(Put(1, "fodor", "mirko"));
  ASSERT_OK(Put(0, "fodor", "mirko"));
  ASSERT_OK(Put(1, "fodor", "mirko"));

  rocksdb::SyncPoint::GetInstance()->LoadDependency({
      {"ColumnFamilyTest::IteratorCloseWALFile2:0",
       "DBImpl::BGWorkPurge:start"},
      {"ColumnFamilyTest::IteratorCloseWALFile2:2",
       "DBImpl::BackgroundCallFlush:start"},
      {"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"},
  });
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();

  WriteOptions wo;
  wo.sync = true;
  ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));

  ASSERT_EQ(2, env.num_open_wal_file_.load());
  // Deleting the iterator will clear its super version, triggering
  // closing all files
  delete it;
  ASSERT_EQ(2, env.num_open_wal_file_.load());

  TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0");
  TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1");
  ASSERT_EQ(1, env.num_open_wal_file_.load());
  TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2");
  WaitForFlush(1);
  ASSERT_EQ(1, env.num_open_wal_file_.load());
  rocksdb::SyncPoint::GetInstance()->DisableProcessing();

  Reopen();
  ASSERT_EQ("mirko", Get(0, "fodor"));
  ASSERT_EQ("mirko", Get(1, "fodor"));
  db_options_.env = env_;
  Close();
}
#endif  // !ROCKSDB_LITE

#ifndef ROCKSDB_LITE  // TEST functions are not supported in lite
TEST_F(ColumnFamilyTest, ForwardIteratorCloseWALFile) {
  SpecialEnv env(Env::Default());
  // Allow both of flush and purge job to schedule.
  env.SetBackgroundThreads(2, Env::HIGH);
  db_options_.env = &env;
  db_options_.max_background_flushes = 1;
  column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(3));
  column_family_options_.level0_file_num_compaction_trigger = 2;
  Open();
  CreateColumnFamilies({"one"});
  ASSERT_OK(Put(1, "fodor", "mirko"));
  ASSERT_OK(Put(1, "fodar2", "mirko"));
  Flush(1);

  // Create an iterator holding the current super version, as well as
  // the SST file just flushed.
  ReadOptions ro;
  ro.tailing = true;
  ro.background_purge_on_iterator_cleanup = true;
  Iterator* it = db_->NewIterator(ro, handles_[1]);
  // A flush will make `it` hold the last reference of its super version.

  ASSERT_OK(Put(1, "fodor", "mirko"));
  ASSERT_OK(Put(1, "fodar2", "mirko"));
  Flush(1);

  WaitForCompaction();

  ASSERT_OK(Put(1, "fodor", "mirko"));
  ASSERT_OK(Put(1, "fodor", "mirko"));
  ASSERT_OK(Put(0, "fodor", "mirko"));
  ASSERT_OK(Put(1, "fodor", "mirko"));

  rocksdb::SyncPoint::GetInstance()->LoadDependency({
      {"ColumnFamilyTest::IteratorCloseWALFile2:0",
       "DBImpl::BGWorkPurge:start"},
      {"ColumnFamilyTest::IteratorCloseWALFile2:2",
       "DBImpl::BackgroundCallFlush:start"},
      {"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"},
  });
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();

  WriteOptions wo;
  wo.sync = true;
  ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));

  env.delete_count_.store(0);
  ASSERT_EQ(2, env.num_open_wal_file_.load());
  // Deleting the iterator will clear its super version, triggering
  // closing all files
  it->Seek("");
  ASSERT_EQ(2, env.num_open_wal_file_.load());
  ASSERT_EQ(0, env.delete_count_.load());

  TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0");
  TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1");
  ASSERT_EQ(1, env.num_open_wal_file_.load());
  ASSERT_EQ(1, env.delete_count_.load());
  TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2");
  WaitForFlush(1);
  ASSERT_EQ(1, env.num_open_wal_file_.load());
  ASSERT_EQ(1, env.delete_count_.load());

  delete it;
  rocksdb::SyncPoint::GetInstance()->DisableProcessing();

  Reopen();
  ASSERT_EQ("mirko", Get(0, "fodor"));
  ASSERT_EQ("mirko", Get(1, "fodor"));
  db_options_.env = env_;
  Close();
}
#endif  // !ROCKSDB_LITE

// Disable on windows because SyncWAL requires env->IsSyncThreadSafe()
// to return true which is not so in unbuffered mode.
#ifndef OS_WIN
TEST_F(ColumnFamilyTest, LogSyncConflictFlush) {
  Open();
  CreateColumnFamiliesAndReopen({"one", "two"});

  Put(0, "", "");
  Put(1, "foo", "bar");

  rocksdb::SyncPoint::GetInstance()->LoadDependency(
      {{"DBImpl::SyncWAL:BeforeMarkLogsSynced:1",
        "ColumnFamilyTest::LogSyncConflictFlush:1"},
       {"ColumnFamilyTest::LogSyncConflictFlush:2",
        "DBImpl::SyncWAL:BeforeMarkLogsSynced:2"}});

  rocksdb::SyncPoint::GetInstance()->EnableProcessing();

  rocksdb::port::Thread thread([&] { db_->SyncWAL(); });

  TEST_SYNC_POINT("ColumnFamilyTest::LogSyncConflictFlush:1");
  Flush(1);
  Put(1, "foo", "bar");
  Flush(1);

  TEST_SYNC_POINT("ColumnFamilyTest::LogSyncConflictFlush:2");

  thread.join();

  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
  Close();
}
#endif

// this test is placed here, because the infrastructure for Column Family
// test is being used to ensure a roll of wal files.
// Basic idea is to test that WAL truncation is being detected and not
// ignored
TEST_F(ColumnFamilyTest, DISABLED_LogTruncationTest) {
  Open();
  CreateColumnFamiliesAndReopen({"one", "two"});

  Build(0, 100);

  // Flush the 0th column family to force a roll of the wal log
  Flush(0);

  // Add some more entries
  Build(100, 100);

  std::vector<std::string> filenames;
  ASSERT_OK(env_->GetChildren(dbname_, &filenames));

  // collect wal files
  std::vector<std::string> logfs;
  for (size_t i = 0; i < filenames.size(); i++) {
    uint64_t number;
    FileType type;
    if (!(ParseFileName(filenames[i], &number, &type))) continue;

    if (type != kLogFile) continue;

    logfs.push_back(filenames[i]);
  }

  std::sort(logfs.begin(), logfs.end());
  ASSERT_GE(logfs.size(), 2);

  // Take the last but one file, and truncate it
  std::string fpath = dbname_ + "/" + logfs[logfs.size() - 2];
  std::vector<std::string> names_save = names_;

  uint64_t fsize;
  ASSERT_OK(env_->GetFileSize(fpath, &fsize));
  ASSERT_GT(fsize, 0);

  Close();

  std::string backup_logs = dbname_ + "/backup_logs";
  std::string t_fpath = backup_logs + "/" + logfs[logfs.size() - 2];

  ASSERT_OK(env_->CreateDirIfMissing(backup_logs));
  // Not sure how easy it is to make this data driven.
  // need to read back the WAL file and truncate last 10
  // entries
  CopyFile(fpath, t_fpath, fsize - 9180);

  ASSERT_OK(env_->DeleteFile(fpath));
  ASSERT_OK(env_->RenameFile(t_fpath, fpath));

  db_options_.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;

  OpenReadOnly(names_save);

  CheckMissed();

  Close();

  Open(names_save);

  CheckMissed();

  Close();

  // cleanup
  env_->DeleteDir(backup_logs);
}
}  // namespace rocksdb

int main(int argc, char** argv) {
  ::testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}