//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//
#include <algorithm>
#include <vector>

#include "db/db_test_util.h"
#include "env/mock_env.h"
#include "file/line_file_reader.h"
#include "file/random_access_file_reader.h"
#include "file/read_write_util.h"
#include "file/readahead_raf.h"
#include "file/sequence_file_reader.h"
#include "file/writable_file_writer.h"
#include "rocksdb/file_system.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/crc32c.h"
#include "util/random.h"
#include "utilities/fault_injection_fs.h"

namespace ROCKSDB_NAMESPACE {

class WritableFileWriterTest : public testing::Test {};

constexpr uint32_t kMb = static_cast<uint32_t>(1) << 20;

TEST_F(WritableFileWriterTest, RangeSync) {
  class FakeWF : public FSWritableFile {
   public:
    explicit FakeWF() : size_(0), last_synced_(0) {}
    ~FakeWF() override {}

    using FSWritableFile::Append;
    IOStatus Append(const Slice& data, const IOOptions& /*options*/,
                    IODebugContext* /*dbg*/) override {
      size_ += data.size();
      return IOStatus::OK();
    }
    IOStatus Truncate(uint64_t /*size*/, const IOOptions& /*options*/,
                      IODebugContext* /*dbg*/) override {
      return IOStatus::OK();
    }
    IOStatus Close(const IOOptions& /*options*/,
                   IODebugContext* /*dbg*/) override {
      EXPECT_GE(size_, last_synced_ + kMb);
      EXPECT_LT(size_, last_synced_ + 2 * kMb);
      // Make sure random writes generated enough writes.
      EXPECT_GT(size_, 10 * kMb);
      return IOStatus::OK();
    }
    IOStatus Flush(const IOOptions& /*options*/,
                   IODebugContext* /*dbg*/) override {
      return IOStatus::OK();
    }
    IOStatus Sync(const IOOptions& /*options*/,
                  IODebugContext* /*dbg*/) override {
      return IOStatus::OK();
    }
    IOStatus Fsync(const IOOptions& /*options*/,
                   IODebugContext* /*dbg*/) override {
      return IOStatus::OK();
    }
    void SetIOPriority(Env::IOPriority /*pri*/) override {}
    uint64_t GetFileSize(const IOOptions& /*options*/,
                         IODebugContext* /*dbg*/) override {
      return size_;
    }
    void GetPreallocationStatus(size_t* /*block_size*/,
                                size_t* /*last_allocated_block*/) override {}
    size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
      return 0;
    }
    IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
      return IOStatus::OK();
    }

   protected:
    IOStatus Allocate(uint64_t /*offset*/, uint64_t /*len*/,
                      const IOOptions& /*options*/,
                      IODebugContext* /*dbg*/) override {
      return IOStatus::OK();
    }
    IOStatus RangeSync(uint64_t offset, uint64_t nbytes,
                       const IOOptions& /*options*/,
                       IODebugContext* /*dbg*/) override {
      EXPECT_EQ(offset % 4096, 0u);
      EXPECT_EQ(nbytes % 4096, 0u);

      EXPECT_EQ(offset, last_synced_);
      last_synced_ = offset + nbytes;
      EXPECT_GE(size_, last_synced_ + kMb);
      if (size_ > 2 * kMb) {
        EXPECT_LT(size_, last_synced_ + 2 * kMb);
      }
      return IOStatus::OK();
    }

    uint64_t size_;
    uint64_t last_synced_;
  };

  EnvOptions env_options;
  env_options.bytes_per_sync = kMb;
  std::unique_ptr<FakeWF> wf(new FakeWF);
  std::unique_ptr<WritableFileWriter> writer(
      new WritableFileWriter(std::move(wf), "" /* don't care */, env_options));
  Random r(301);
  Status s;
  std::unique_ptr<char[]> large_buf(new char[10 * kMb]);
  for (int i = 0; i < 1000; i++) {
    int skew_limit = (i < 700) ? 10 : 15;
    uint32_t num = r.Skewed(skew_limit) * 100 + r.Uniform(100);
    s = writer->Append(Slice(large_buf.get(), num));
    ASSERT_OK(s);

    // Flush in a chance of 1/10.
    if (r.Uniform(10) == 0) {
      s = writer->Flush();
      ASSERT_OK(s);
    }
  }
  s = writer->Close();
  ASSERT_OK(s);
}

TEST_F(WritableFileWriterTest, IncrementalBuffer) {
  class FakeWF : public FSWritableFile {
   public:
    explicit FakeWF(std::string* _file_data, bool _use_direct_io,
                    bool _no_flush)
        : file_data_(_file_data),
          use_direct_io_(_use_direct_io),
          no_flush_(_no_flush) {}
    ~FakeWF() override {}

    using FSWritableFile::Append;
    IOStatus Append(const Slice& data, const IOOptions& /*options*/,
                    IODebugContext* /*dbg*/) override {
      file_data_->append(data.data(), data.size());
      size_ += data.size();
      return IOStatus::OK();
    }
    using FSWritableFile::PositionedAppend;
    IOStatus PositionedAppend(const Slice& data, uint64_t pos,
                              const IOOptions& /*options*/,
                              IODebugContext* /*dbg*/) override {
      EXPECT_TRUE(pos % 512 == 0);
      EXPECT_TRUE(data.size() % 512 == 0);
      file_data_->resize(pos);
      file_data_->append(data.data(), data.size());
      size_ += data.size();
      return IOStatus::OK();
    }

    IOStatus Truncate(uint64_t size, const IOOptions& /*options*/,
                      IODebugContext* /*dbg*/) override {
      file_data_->resize(size);
      return IOStatus::OK();
    }
    IOStatus Close(const IOOptions& /*options*/,
                   IODebugContext* /*dbg*/) override {
      return IOStatus::OK();
    }
    IOStatus Flush(const IOOptions& /*options*/,
                   IODebugContext* /*dbg*/) override {
      return IOStatus::OK();
    }
    IOStatus Sync(const IOOptions& /*options*/,
                  IODebugContext* /*dbg*/) override {
      return IOStatus::OK();
    }
    IOStatus Fsync(const IOOptions& /*options*/,
                   IODebugContext* /*dbg*/) override {
      return IOStatus::OK();
    }
    void SetIOPriority(Env::IOPriority /*pri*/) override {}
    uint64_t GetFileSize(const IOOptions& /*options*/,
                         IODebugContext* /*dbg*/) override {
      return size_;
    }
    void GetPreallocationStatus(size_t* /*block_size*/,
                                size_t* /*last_allocated_block*/) override {}
    size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
      return 0;
    }
    IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
      return IOStatus::OK();
    }
    bool use_direct_io() const override { return use_direct_io_; }

    std::string* file_data_;
    bool use_direct_io_;
    bool no_flush_;
    size_t size_ = 0;
  };

  Random r(301);
  const int kNumAttempts = 50;
  for (int attempt = 0; attempt < kNumAttempts; attempt++) {
    bool no_flush = (attempt % 3 == 0);
    EnvOptions env_options;
    env_options.writable_file_max_buffer_size =
        (attempt < kNumAttempts / 2) ? 512 * 1024 : 700 * 1024;
    std::string actual;
    std::unique_ptr<FakeWF> wf(new FakeWF(&actual,
#ifndef ROCKSDB_LITE
                                          attempt % 2 == 1,
#else
                                          false,
#endif
                                          no_flush));
    std::unique_ptr<WritableFileWriter> writer(new WritableFileWriter(
        std::move(wf), "" /* don't care */, env_options));

    std::string target;
    for (int i = 0; i < 20; i++) {
      uint32_t num = r.Skewed(16) * 100 + r.Uniform(100);
      std::string random_string = r.RandomString(num);
      ASSERT_OK(writer->Append(Slice(random_string.c_str(), num)));
      target.append(random_string.c_str(), num);

      // In some attempts, flush in a chance of 1/10.
      if (!no_flush && r.Uniform(10) == 0) {
        ASSERT_OK(writer->Flush());
      }
    }
    ASSERT_OK(writer->Flush());
    ASSERT_OK(writer->Close());
    ASSERT_EQ(target.size(), actual.size());
    ASSERT_EQ(target, actual);
  }
}

TEST_F(WritableFileWriterTest, BufferWithZeroCapacityDirectIO) {
  EnvOptions env_opts;
  env_opts.use_direct_writes = true;
  env_opts.writable_file_max_buffer_size = 0;
  {
    std::unique_ptr<WritableFileWriter> writer;
    const Status s =
        WritableFileWriter::Create(FileSystem::Default(), /*fname=*/"dont_care",
                                   FileOptions(env_opts), &writer,
                                   /*dbg=*/nullptr);
    ASSERT_TRUE(s.IsInvalidArgument());
  }
}

class DBWritableFileWriterTest : public DBTestBase {
 public:
  DBWritableFileWriterTest()
      : DBTestBase("db_secondary_cache_test", /*env_do_fsync=*/true) {
    fault_fs_.reset(new FaultInjectionTestFS(env_->GetFileSystem()));
    fault_env_.reset(new CompositeEnvWrapper(env_, fault_fs_));
  }

  std::shared_ptr<FaultInjectionTestFS> fault_fs_;
  std::unique_ptr<Env> fault_env_;
};

TEST_F(DBWritableFileWriterTest, AppendWithChecksum) {
  FileOptions file_options = FileOptions();
  Options options = GetDefaultOptions();
  options.create_if_missing = true;
  DestroyAndReopen(options);
  std::string fname = dbname_ + "/test_file";
  std::unique_ptr<FSWritableFile> writable_file_ptr;
  ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr,
                                       /*dbg*/ nullptr));
  std::unique_ptr<TestFSWritableFile> file;
  file.reset(new TestFSWritableFile(
      fname, file_options, std::move(writable_file_ptr), fault_fs_.get()));
  std::unique_ptr<WritableFileWriter> file_writer;
  ImmutableOptions ioptions(options);
  file_writer.reset(new WritableFileWriter(
      std::move(file), fname, file_options, SystemClock::Default().get(),
      nullptr, ioptions.stats, ioptions.listeners,
      ioptions.file_checksum_gen_factory.get(), true, true));

  Random rnd(301);
  std::string data = rnd.RandomString(1000);
  uint32_t data_crc32c = crc32c::Value(data.c_str(), data.size());
  fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);

  ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c));
  ASSERT_OK(file_writer->Flush());
  Random size_r(47);
  for (int i = 0; i < 2000; i++) {
    data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
    data_crc32c = crc32c::Value(data.c_str(), data.size());
    ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c));

    data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
    ASSERT_OK(file_writer->Append(Slice(data.c_str())));
    ASSERT_OK(file_writer->Flush());
  }
  ASSERT_OK(file_writer->Close());
  Destroy(options);
}

TEST_F(DBWritableFileWriterTest, AppendVerifyNoChecksum) {
  FileOptions file_options = FileOptions();
  Options options = GetDefaultOptions();
  options.create_if_missing = true;
  DestroyAndReopen(options);
  std::string fname = dbname_ + "/test_file";
  std::unique_ptr<FSWritableFile> writable_file_ptr;
  ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr,
                                       /*dbg*/ nullptr));
  std::unique_ptr<TestFSWritableFile> file;
  file.reset(new TestFSWritableFile(
      fname, file_options, std::move(writable_file_ptr), fault_fs_.get()));
  std::unique_ptr<WritableFileWriter> file_writer;
  ImmutableOptions ioptions(options);
  // Enable checksum handoff for this file, but do not enable buffer checksum.
  // So Append with checksum logic will not be triggered
  file_writer.reset(new WritableFileWriter(
      std::move(file), fname, file_options, SystemClock::Default().get(),
      nullptr, ioptions.stats, ioptions.listeners,
      ioptions.file_checksum_gen_factory.get(), true, false));

  Random rnd(301);
  std::string data = rnd.RandomString(1000);
  uint32_t data_crc32c = crc32c::Value(data.c_str(), data.size());
  fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);

  ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c));
  ASSERT_OK(file_writer->Flush());
  Random size_r(47);
  for (int i = 0; i < 1000; i++) {
    data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
    data_crc32c = crc32c::Value(data.c_str(), data.size());
    ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c));

    data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
    ASSERT_OK(file_writer->Append(Slice(data.c_str())));
    ASSERT_OK(file_writer->Flush());
  }
  ASSERT_OK(file_writer->Close());
  Destroy(options);
}

TEST_F(DBWritableFileWriterTest, AppendWithChecksumRateLimiter) {
  FileOptions file_options = FileOptions();
  file_options.rate_limiter = nullptr;
  Options options = GetDefaultOptions();
  options.create_if_missing = true;
  DestroyAndReopen(options);
  std::string fname = dbname_ + "/test_file";
  std::unique_ptr<FSWritableFile> writable_file_ptr;
  ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr,
                                       /*dbg*/ nullptr));
  std::unique_ptr<TestFSWritableFile> file;
  file.reset(new TestFSWritableFile(
      fname, file_options, std::move(writable_file_ptr), fault_fs_.get()));
  std::unique_ptr<WritableFileWriter> file_writer;
  ImmutableOptions ioptions(options);
  // Enable checksum handoff for this file, but do not enable buffer checksum.
  // So Append with checksum logic will not be triggered
  file_writer.reset(new WritableFileWriter(
      std::move(file), fname, file_options, SystemClock::Default().get(),
      nullptr, ioptions.stats, ioptions.listeners,
      ioptions.file_checksum_gen_factory.get(), true, true));
  fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);

  Random rnd(301);
  std::string data;
  uint32_t data_crc32c;
  uint64_t start = fault_env_->NowMicros();
  Random size_r(47);
  uint64_t bytes_written = 0;
  for (int i = 0; i < 100; i++) {
    data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
    data_crc32c = crc32c::Value(data.c_str(), data.size());
    ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c));
    bytes_written += static_cast<uint64_t>(data.size());

    data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
    ASSERT_OK(file_writer->Append(Slice(data.c_str())));
    ASSERT_OK(file_writer->Flush());
    bytes_written += static_cast<uint64_t>(data.size());
  }
  uint64_t elapsed = fault_env_->NowMicros() - start;
  double raw_rate = bytes_written * 1000000.0 / elapsed;
  ASSERT_OK(file_writer->Close());

  // Set the rate-limiter
  FileOptions file_options1 = FileOptions();
  file_options1.rate_limiter =
      NewGenericRateLimiter(static_cast<int64_t>(0.5 * raw_rate));
  fname = dbname_ + "/test_file_1";
  std::unique_ptr<FSWritableFile> writable_file_ptr1;
  ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options1,
                                       &writable_file_ptr1,
                                       /*dbg*/ nullptr));
  file.reset(new TestFSWritableFile(
      fname, file_options1, std::move(writable_file_ptr1), fault_fs_.get()));
  // Enable checksum handoff for this file, but do not enable buffer checksum.
  // So Append with checksum logic will not be triggered
  file_writer.reset(new WritableFileWriter(
      std::move(file), fname, file_options1, SystemClock::Default().get(),
      nullptr, ioptions.stats, ioptions.listeners,
      ioptions.file_checksum_gen_factory.get(), true, true));

  for (int i = 0; i < 1000; i++) {
    data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
    data_crc32c = crc32c::Value(data.c_str(), data.size());
    ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c));

    data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
    ASSERT_OK(file_writer->Append(Slice(data.c_str())));
    ASSERT_OK(file_writer->Flush());
  }
  ASSERT_OK(file_writer->Close());
  if (file_options1.rate_limiter != nullptr) {
    delete file_options1.rate_limiter;
  }

  Destroy(options);
}

#ifndef ROCKSDB_LITE
TEST_F(WritableFileWriterTest, AppendStatusReturn) {
  class FakeWF : public FSWritableFile {
   public:
    explicit FakeWF() : use_direct_io_(false), io_error_(false) {}

    bool use_direct_io() const override { return use_direct_io_; }

    using FSWritableFile::Append;
    IOStatus Append(const Slice& /*data*/, const IOOptions& /*options*/,
                    IODebugContext* /*dbg*/) override {
      if (io_error_) {
        return IOStatus::IOError("Fake IO error");
      }
      return IOStatus::OK();
    }
    using FSWritableFile::PositionedAppend;
    IOStatus PositionedAppend(const Slice& /*data*/, uint64_t,
                              const IOOptions& /*options*/,
                              IODebugContext* /*dbg*/) override {
      if (io_error_) {
        return IOStatus::IOError("Fake IO error");
      }
      return IOStatus::OK();
    }
    IOStatus Close(const IOOptions& /*options*/,
                   IODebugContext* /*dbg*/) override {
      return IOStatus::OK();
    }
    IOStatus Flush(const IOOptions& /*options*/,
                   IODebugContext* /*dbg*/) override {
      return IOStatus::OK();
    }
    IOStatus Sync(const IOOptions& /*options*/,
                  IODebugContext* /*dbg*/) override {
      return IOStatus::OK();
    }
    void Setuse_direct_io(bool val) { use_direct_io_ = val; }
    void SetIOError(bool val) { io_error_ = val; }

   protected:
    bool use_direct_io_;
    bool io_error_;
  };
  std::unique_ptr<FakeWF> wf(new FakeWF());
  wf->Setuse_direct_io(true);
  std::unique_ptr<WritableFileWriter> writer(
      new WritableFileWriter(std::move(wf), "" /* don't care */, EnvOptions()));

  ASSERT_OK(writer->Append(std::string(2 * kMb, 'a')));

  // Next call to WritableFile::Append() should fail
  FakeWF* fwf = static_cast<FakeWF*>(writer->writable_file());
  fwf->SetIOError(true);
  ASSERT_NOK(writer->Append(std::string(2 * kMb, 'b')));
}
#endif

class ReadaheadRandomAccessFileTest
    : public testing::Test,
      public testing::WithParamInterface<size_t> {
 public:
  static std::vector<size_t> GetReadaheadSizeList() {
    return {1lu << 12, 1lu << 16};
  }
  void SetUp() override {
    readahead_size_ = GetParam();
    scratch_.reset(new char[2 * readahead_size_]);
    ResetSourceStr();
  }
  ReadaheadRandomAccessFileTest() : control_contents_() {}
  std::string Read(uint64_t offset, size_t n) {
    Slice result;
    Status s = test_read_holder_->Read(offset, n, IOOptions(), &result,
                                       scratch_.get(), nullptr);
    EXPECT_TRUE(s.ok() || s.IsInvalidArgument());
    return std::string(result.data(), result.size());
  }
  void ResetSourceStr(const std::string& str = "") {
    std::unique_ptr<FSWritableFile> sink(
        new test::StringSink(&control_contents_));
    std::unique_ptr<WritableFileWriter> write_holder(new WritableFileWriter(
        std::move(sink), "" /* don't care */, FileOptions()));
    Status s = write_holder->Append(Slice(str));
    EXPECT_OK(s);
    s = write_holder->Flush();
    EXPECT_OK(s);
    std::unique_ptr<FSRandomAccessFile> read_holder(
        new test::StringSource(control_contents_));
    test_read_holder_ =
        NewReadaheadRandomAccessFile(std::move(read_holder), readahead_size_);
  }
  size_t GetReadaheadSize() const { return readahead_size_; }

 private:
  size_t readahead_size_;
  Slice control_contents_;
  std::unique_ptr<FSRandomAccessFile> test_read_holder_;
  std::unique_ptr<char[]> scratch_;
};

TEST_P(ReadaheadRandomAccessFileTest, EmptySourceStr) {
  ASSERT_EQ("", Read(0, 1));
  ASSERT_EQ("", Read(0, 0));
  ASSERT_EQ("", Read(13, 13));
}

TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenLessThanReadaheadSize) {
  std::string str = "abcdefghijklmnopqrs";
  ResetSourceStr(str);
  ASSERT_EQ(str.substr(3, 4), Read(3, 4));
  ASSERT_EQ(str.substr(0, 3), Read(0, 3));
  ASSERT_EQ(str, Read(0, str.size()));
  ASSERT_EQ(str.substr(7, std::min(static_cast<int>(str.size()) - 7, 30)),
            Read(7, 30));
  ASSERT_EQ("", Read(100, 100));
}

TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenGreaterThanReadaheadSize) {
  Random rng(42);
  for (int k = 0; k < 100; ++k) {
    size_t strLen = k * GetReadaheadSize() +
                    rng.Uniform(static_cast<int>(GetReadaheadSize()));
    std::string str = rng.HumanReadableString(static_cast<int>(strLen));
    ResetSourceStr(str);
    for (int test = 1; test <= 100; ++test) {
      size_t offset = rng.Uniform(static_cast<int>(strLen));
      size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
      ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)),
                Read(offset, n));
    }
  }
}

TEST_P(ReadaheadRandomAccessFileTest, ReadExceedsReadaheadSize) {
  Random rng(7);
  size_t strLen = 4 * GetReadaheadSize() +
                  rng.Uniform(static_cast<int>(GetReadaheadSize()));
  std::string str = rng.HumanReadableString(static_cast<int>(strLen));
  ResetSourceStr(str);
  for (int test = 1; test <= 100; ++test) {
    size_t offset = rng.Uniform(static_cast<int>(strLen));
    size_t n =
        GetReadaheadSize() + rng.Uniform(static_cast<int>(GetReadaheadSize()));
    ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)),
              Read(offset, n));
  }
}

INSTANTIATE_TEST_CASE_P(
    EmptySourceStr, ReadaheadRandomAccessFileTest,
    ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
INSTANTIATE_TEST_CASE_P(
    SourceStrLenLessThanReadaheadSize, ReadaheadRandomAccessFileTest,
    ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
INSTANTIATE_TEST_CASE_P(
    SourceStrLenGreaterThanReadaheadSize, ReadaheadRandomAccessFileTest,
    ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
INSTANTIATE_TEST_CASE_P(
    ReadExceedsReadaheadSize, ReadaheadRandomAccessFileTest,
    ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));

class ReadaheadSequentialFileTest : public testing::Test,
                                    public testing::WithParamInterface<size_t> {
 public:
  static std::vector<size_t> GetReadaheadSizeList() {
    return {1lu << 8, 1lu << 12, 1lu << 16, 1lu << 18};
  }
  void SetUp() override {
    readahead_size_ = GetParam();
    scratch_.reset(new char[2 * readahead_size_]);
    ResetSourceStr();
  }
  ReadaheadSequentialFileTest() {}
  std::string Read(size_t n) {
    Slice result;
    Status s = test_read_holder_->Read(n, &result, scratch_.get());
    EXPECT_TRUE(s.ok() || s.IsInvalidArgument());
    return std::string(result.data(), result.size());
  }
  void Skip(size_t n) { test_read_holder_->Skip(n); }
  void ResetSourceStr(const std::string& str = "") {
    auto read_holder = std::unique_ptr<FSSequentialFile>(
        new test::SeqStringSource(str, &seq_read_count_));
    test_read_holder_.reset(new SequentialFileReader(std::move(read_holder),
                                                     "test", readahead_size_));
  }
  size_t GetReadaheadSize() const { return readahead_size_; }

 private:
  size_t readahead_size_;
  std::unique_ptr<SequentialFileReader> test_read_holder_;
  std::unique_ptr<char[]> scratch_;
  std::atomic<int> seq_read_count_;
};

TEST_P(ReadaheadSequentialFileTest, EmptySourceStr) {
  ASSERT_EQ("", Read(0));
  ASSERT_EQ("", Read(1));
  ASSERT_EQ("", Read(13));
}

TEST_P(ReadaheadSequentialFileTest, SourceStrLenLessThanReadaheadSize) {
  std::string str = "abcdefghijklmnopqrs";
  ResetSourceStr(str);
  ASSERT_EQ(str.substr(0, 3), Read(3));
  ASSERT_EQ(str.substr(3, 1), Read(1));
  ASSERT_EQ(str.substr(4), Read(str.size()));
  ASSERT_EQ("", Read(100));
}

TEST_P(ReadaheadSequentialFileTest, SourceStrLenGreaterThanReadaheadSize) {
  Random rng(42);
  for (int s = 0; s < 1; ++s) {
    for (int k = 0; k < 100; ++k) {
      size_t strLen = k * GetReadaheadSize() +
                      rng.Uniform(static_cast<int>(GetReadaheadSize()));
      std::string str = rng.HumanReadableString(static_cast<int>(strLen));
      ResetSourceStr(str);
      size_t offset = 0;
      for (int test = 1; test <= 100; ++test) {
        size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
        if (s && test % 2) {
          Skip(n);
        } else {
          ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n));
        }
        offset = std::min(offset + n, strLen);
      }
    }
  }
}

TEST_P(ReadaheadSequentialFileTest, ReadExceedsReadaheadSize) {
  Random rng(42);
  for (int s = 0; s < 1; ++s) {
    for (int k = 0; k < 100; ++k) {
      size_t strLen = k * GetReadaheadSize() +
                      rng.Uniform(static_cast<int>(GetReadaheadSize()));
      std::string str = rng.HumanReadableString(static_cast<int>(strLen));
      ResetSourceStr(str);
      size_t offset = 0;
      for (int test = 1; test <= 100; ++test) {
        size_t n = GetReadaheadSize() +
                   rng.Uniform(static_cast<int>(GetReadaheadSize()));
        if (s && test % 2) {
          Skip(n);
        } else {
          ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n));
        }
        offset = std::min(offset + n, strLen);
      }
    }
  }
}

INSTANTIATE_TEST_CASE_P(
    EmptySourceStr, ReadaheadSequentialFileTest,
    ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
INSTANTIATE_TEST_CASE_P(
    SourceStrLenLessThanReadaheadSize, ReadaheadSequentialFileTest,
    ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
INSTANTIATE_TEST_CASE_P(
    SourceStrLenGreaterThanReadaheadSize, ReadaheadSequentialFileTest,
    ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
INSTANTIATE_TEST_CASE_P(
    ReadExceedsReadaheadSize, ReadaheadSequentialFileTest,
    ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));

namespace {
std::string GenerateLine(int n) {
  std::string rv;
  // Multiples of 17 characters per line, for likely bad buffer alignment
  for (int i = 0; i < n; ++i) {
    rv.push_back(static_cast<char>('0' + (i % 10)));
    rv.append("xxxxxxxxxxxxxxxx");
  }
  return rv;
}
}  // namespace

TEST(LineFileReaderTest, LineFileReaderTest) {
  const int nlines = 1000;

  std::unique_ptr<Env> mem_env(MockEnv::Create(Env::Default()));
  std::shared_ptr<FileSystem> fs = mem_env->GetFileSystem();
  // Create an input file
  {
    std::unique_ptr<FSWritableFile> file;
    ASSERT_OK(
        fs->NewWritableFile("testfile", FileOptions(), &file, /*dbg*/ nullptr));

    for (int i = 0; i < nlines; ++i) {
      std::string line = GenerateLine(i);
      line.push_back('\n');
      ASSERT_OK(file->Append(line, IOOptions(), /*dbg*/ nullptr));
    }
  }

  // Verify with no I/O errors
  {
    std::unique_ptr<LineFileReader> reader;
    ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader,
                                     nullptr));
    std::string line;
    int count = 0;
    while (reader->ReadLine(&line)) {
      ASSERT_EQ(line, GenerateLine(count));
      ++count;
      ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
    }
    ASSERT_OK(reader->GetStatus());
    ASSERT_EQ(count, nlines);
    ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
    // And still
    ASSERT_FALSE(reader->ReadLine(&line));
    ASSERT_OK(reader->GetStatus());
    ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
  }

  // Verify with injected I/O error
  {
    std::unique_ptr<LineFileReader> reader;
    ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader,
                                     nullptr));
    std::string line;
    int count = 0;
    // Read part way through the file
    while (count < nlines / 4) {
      ASSERT_TRUE(reader->ReadLine(&line));
      ASSERT_EQ(line, GenerateLine(count));
      ++count;
      ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
    }
    ASSERT_OK(reader->GetStatus());

    // Inject error
    int callback_count = 0;
    SyncPoint::GetInstance()->SetCallBack(
        "MemFile::Read:IOStatus", [&](void* arg) {
          IOStatus* status = static_cast<IOStatus*>(arg);
          *status = IOStatus::Corruption("test");
          ++callback_count;
        });
    SyncPoint::GetInstance()->EnableProcessing();

    while (reader->ReadLine(&line)) {
      ASSERT_EQ(line, GenerateLine(count));
      ++count;
      ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
    }
    ASSERT_TRUE(reader->GetStatus().IsCorruption());
    ASSERT_LT(count, nlines / 2);
    ASSERT_EQ(callback_count, 1);

    // Still get error & no retry
    ASSERT_FALSE(reader->ReadLine(&line));
    ASSERT_TRUE(reader->GetStatus().IsCorruption());
    ASSERT_EQ(callback_count, 1);

    SyncPoint::GetInstance()->DisableProcessing();
    SyncPoint::GetInstance()->ClearAllCallBacks();
  }
}

#ifndef ROCKSDB_LITE
class IOErrorEventListener : public EventListener {
 public:
  IOErrorEventListener() { notify_error_.store(0); }

  void OnIOError(const IOErrorInfo& io_error_info) override {
    notify_error_++;
    EXPECT_FALSE(io_error_info.file_path.empty());
    EXPECT_FALSE(io_error_info.io_status.ok());
  }

  size_t NotifyErrorCount() { return notify_error_; }

  bool ShouldBeNotifiedOnFileIO() override { return true; }

 private:
  std::atomic<size_t> notify_error_;
};

TEST_F(DBWritableFileWriterTest, IOErrorNotification) {
  class FakeWF : public FSWritableFile {
   public:
    explicit FakeWF() : io_error_(false) {
      file_append_errors_.store(0);
      file_flush_errors_.store(0);
    }

    using FSWritableFile::Append;
    IOStatus Append(const Slice& /*data*/, const IOOptions& /*options*/,
                    IODebugContext* /*dbg*/) override {
      if (io_error_) {
        file_append_errors_++;
        return IOStatus::IOError("Fake IO error");
      }
      return IOStatus::OK();
    }

    using FSWritableFile::PositionedAppend;
    IOStatus PositionedAppend(const Slice& /*data*/, uint64_t,
                              const IOOptions& /*options*/,
                              IODebugContext* /*dbg*/) override {
      if (io_error_) {
        return IOStatus::IOError("Fake IO error");
      }
      return IOStatus::OK();
    }
    IOStatus Close(const IOOptions& /*options*/,
                   IODebugContext* /*dbg*/) override {
      return IOStatus::OK();
    }
    IOStatus Flush(const IOOptions& /*options*/,
                   IODebugContext* /*dbg*/) override {
      if (io_error_) {
        file_flush_errors_++;
        return IOStatus::IOError("Fake IO error");
      }
      return IOStatus::OK();
    }
    IOStatus Sync(const IOOptions& /*options*/,
                  IODebugContext* /*dbg*/) override {
      return IOStatus::OK();
    }

    void SetIOError(bool val) { io_error_ = val; }

    void CheckCounters(int file_append_errors, int file_flush_errors) {
      ASSERT_EQ(file_append_errors, file_append_errors_);
      ASSERT_EQ(file_flush_errors_, file_flush_errors);
    }

   protected:
    bool io_error_;
    std::atomic<size_t> file_append_errors_;
    std::atomic<size_t> file_flush_errors_;
  };

  FileOptions file_options = FileOptions();
  Options options = GetDefaultOptions();
  options.create_if_missing = true;
  IOErrorEventListener* listener = new IOErrorEventListener();
  options.listeners.emplace_back(listener);

  DestroyAndReopen(options);
  ImmutableOptions ioptions(options);

  std::string fname = dbname_ + "/test_file";
  std::unique_ptr<FakeWF> writable_file_ptr(new FakeWF);

  std::unique_ptr<WritableFileWriter> file_writer;
  writable_file_ptr->SetIOError(true);

  file_writer.reset(new WritableFileWriter(
      std::move(writable_file_ptr), fname, file_options,
      SystemClock::Default().get(), nullptr, ioptions.stats, ioptions.listeners,
      ioptions.file_checksum_gen_factory.get(), true, true));

  FakeWF* fwf = static_cast<FakeWF*>(file_writer->writable_file());

  fwf->SetIOError(true);
  ASSERT_NOK(file_writer->Append(std::string(2 * kMb, 'a')));
  fwf->CheckCounters(1, 0);
  ASSERT_EQ(listener->NotifyErrorCount(), 1);

  fwf->SetIOError(true);
  ASSERT_NOK(file_writer->Flush());
  fwf->CheckCounters(1, 1);
  ASSERT_EQ(listener->NotifyErrorCount(), 2);

  /* No error generation */
  fwf->SetIOError(false);
  ASSERT_OK(file_writer->Append(std::string(2 * kMb, 'b')));
  ASSERT_EQ(listener->NotifyErrorCount(), 2);
  fwf->CheckCounters(1, 1);
}
#endif  // ROCKSDB_LITE

class WritableFileWriterIOPriorityTest : public testing::Test {
 protected:
  // This test is to check whether the rate limiter priority can be passed
  // correctly from WritableFileWriter functions to FSWritableFile functions.

  void SetUp() override {
    // When op_rate_limiter_priority parameter in WritableFileWriter functions
    // is the default (Env::IO_TOTAL).
    std::unique_ptr<FakeWF> wf{new FakeWF(Env::IO_HIGH)};
    FileOptions file_options;
    writer_.reset(new WritableFileWriter(std::move(wf), "" /* don't care */,
                                         file_options));
  }

  class FakeWF : public FSWritableFile {
   public:
    explicit FakeWF(Env::IOPriority io_priority) { SetIOPriority(io_priority); }
    ~FakeWF() override {}

    IOStatus Append(const Slice& /*data*/, const IOOptions& options,
                    IODebugContext* /*dbg*/) override {
      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
      return IOStatus::OK();
    }
    IOStatus Append(const Slice& data, const IOOptions& options,
                    const DataVerificationInfo& /* verification_info */,
                    IODebugContext* dbg) override {
      return Append(data, options, dbg);
    }
    IOStatus PositionedAppend(const Slice& /*data*/, uint64_t /*offset*/,
                              const IOOptions& options,
                              IODebugContext* /*dbg*/) override {
      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
      return IOStatus::OK();
    }
    IOStatus PositionedAppend(
        const Slice& /* data */, uint64_t /* offset */,
        const IOOptions& options,
        const DataVerificationInfo& /* verification_info */,
        IODebugContext* /*dbg*/) override {
      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
      return IOStatus::OK();
    }
    IOStatus Truncate(uint64_t /*size*/, const IOOptions& options,
                      IODebugContext* /*dbg*/) override {
      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
      return IOStatus::OK();
    }
    IOStatus Close(const IOOptions& options, IODebugContext* /*dbg*/) override {
      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
      return IOStatus::OK();
    }
    IOStatus Flush(const IOOptions& options, IODebugContext* /*dbg*/) override {
      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
      return IOStatus::OK();
    }
    IOStatus Sync(const IOOptions& options, IODebugContext* /*dbg*/) override {
      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
      return IOStatus::OK();
    }
    IOStatus Fsync(const IOOptions& options, IODebugContext* /*dbg*/) override {
      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
      return IOStatus::OK();
    }
    uint64_t GetFileSize(const IOOptions& options,
                         IODebugContext* /*dbg*/) override {
      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
      return 0;
    }
    void GetPreallocationStatus(size_t* /*block_size*/,
                                size_t* /*last_allocated_block*/) override {}
    size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
      return 0;
    }
    IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
      return IOStatus::OK();
    }

    IOStatus Allocate(uint64_t /*offset*/, uint64_t /*len*/,
                      const IOOptions& options,
                      IODebugContext* /*dbg*/) override {
      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
      return IOStatus::OK();
    }
    IOStatus RangeSync(uint64_t /*offset*/, uint64_t /*nbytes*/,
                       const IOOptions& options,
                       IODebugContext* /*dbg*/) override {
      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
      return IOStatus::OK();
    }

    void PrepareWrite(size_t /*offset*/, size_t /*len*/,
                      const IOOptions& options,
                      IODebugContext* /*dbg*/) override {
      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
    }

    bool IsSyncThreadSafe() const override { return true; }
  };

  std::unique_ptr<WritableFileWriter> writer_;
};

TEST_F(WritableFileWriterIOPriorityTest, Append) {
  ASSERT_OK(writer_->Append(Slice("abc")));
}

TEST_F(WritableFileWriterIOPriorityTest, Pad) { ASSERT_OK(writer_->Pad(500)); }

TEST_F(WritableFileWriterIOPriorityTest, Flush) { ASSERT_OK(writer_->Flush()); }

TEST_F(WritableFileWriterIOPriorityTest, Close) { ASSERT_OK(writer_->Close()); }

TEST_F(WritableFileWriterIOPriorityTest, Sync) {
  ASSERT_OK(writer_->Sync(false));
  ASSERT_OK(writer_->Sync(true));
}

TEST_F(WritableFileWriterIOPriorityTest, SyncWithoutFlush) {
  ASSERT_OK(writer_->SyncWithoutFlush(false));
  ASSERT_OK(writer_->SyncWithoutFlush(true));
}

TEST_F(WritableFileWriterIOPriorityTest, BasicOp) {
  EnvOptions env_options;
  env_options.bytes_per_sync = kMb;
  std::unique_ptr<FakeWF> wf(new FakeWF(Env::IO_HIGH));
  std::unique_ptr<WritableFileWriter> writer(
      new WritableFileWriter(std::move(wf), "" /* don't care */, env_options));
  Random r(301);
  Status s;
  std::unique_ptr<char[]> large_buf(new char[10 * kMb]);
  for (int i = 0; i < 1000; i++) {
    int skew_limit = (i < 700) ? 10 : 15;
    uint32_t num = r.Skewed(skew_limit) * 100 + r.Uniform(100);
    s = writer->Append(Slice(large_buf.get(), num));
    ASSERT_OK(s);

    // Flush in a chance of 1/10.
    if (r.Uniform(10) == 0) {
      s = writer->Flush();
      ASSERT_OK(s);
    }
  }
  s = writer->Close();
  ASSERT_OK(s);
}
}  // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
  ::testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}