//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE

#include "utilities/persistent_cache/block_cache_tier.h"

#include <regex>
#include <utility>
#include <vector>

#include "logging/logging.h"
#include "port/port.h"
#include "test_util/sync_point.h"
#include "util/stop_watch.h"
#include "utilities/persistent_cache/block_cache_tier_file.h"

namespace ROCKSDB_NAMESPACE {

//
// BlockCacheImpl
//
Status BlockCacheTier::Open() {
  Status status;

  WriteLock _(&lock_);

  assert(!size_);

  // Check the validity of the options
  status = opt_.ValidateSettings();
  assert(status.ok());
  if (!status.ok()) {
    Error(opt_.log, "Invalid block cache options");
    return status;
  }

  // Create base directory or cleanup existing directory
  status = opt_.env->CreateDirIfMissing(opt_.path);
  if (!status.ok()) {
    Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),
          status.ToString().c_str());
    return status;
  }

  // Create base/<cache dir> directory
  status = opt_.env->CreateDir(GetCachePath());
  if (!status.ok()) {
    // directory already exists, clean it up
    status = CleanupCacheFolder(GetCachePath());
    assert(status.ok());
    if (!status.ok()) {
      Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),
            status.ToString().c_str());
      return status;
    }
  }

  // create a new file
  assert(!cache_file_);
  status = NewCacheFile();
  if (!status.ok()) {
    Error(opt_.log, "Error creating new file %s. %s", opt_.path.c_str(),
          status.ToString().c_str());
    return status;
  }

  assert(cache_file_);

  if (opt_.pipeline_writes) {
    assert(!insert_th_.joinable());
    insert_th_ = port::Thread(&BlockCacheTier::InsertMain, this);
  }

  return Status::OK();
}

bool IsCacheFile(const std::string& file) {
  // check if the file has .rc suffix
  // Unfortunately regex support across compilers is not even, so we use simple
  // string parsing
  size_t pos = file.find(".");
  if (pos == std::string::npos) {
    return false;
  }

  std::string suffix = file.substr(pos);
  return suffix == ".rc";
}

Status BlockCacheTier::CleanupCacheFolder(const std::string& folder) {
  std::vector<std::string> files;
  Status status = opt_.env->GetChildren(folder, &files);
  if (!status.ok()) {
    Error(opt_.log, "Error getting files for %s. %s", folder.c_str(),
          status.ToString().c_str());
    return status;
  }

  // cleanup files with the patter :digi:.rc
  for (auto file : files) {
    if (IsCacheFile(file)) {
      // cache file
      Info(opt_.log, "Removing file %s.", file.c_str());
      status = opt_.env->DeleteFile(folder + "/" + file);
      if (!status.ok()) {
        Error(opt_.log, "Error deleting file %s. %s", file.c_str(),
              status.ToString().c_str());
        return status;
      }
    } else {
      ROCKS_LOG_DEBUG(opt_.log, "Skipping file %s", file.c_str());
    }
  }
  return Status::OK();
}

Status BlockCacheTier::Close() {
  // stop the insert thread
  if (opt_.pipeline_writes && insert_th_.joinable()) {
    InsertOp op(/*quit=*/true);
    insert_ops_.Push(std::move(op));
    insert_th_.join();
  }

  // stop the writer before
  writer_.Stop();

  // clear all metadata
  WriteLock _(&lock_);
  metadata_.Clear();
  return Status::OK();
}

template<class T>
void Add(std::map<std::string, double>* stats, const std::string& key,
         const T& t) {
  stats->insert({key, static_cast<double>(t)});
}

PersistentCache::StatsType BlockCacheTier::Stats() {
  std::map<std::string, double> stats;
  Add(&stats, "persistentcache.blockcachetier.bytes_piplined",
      stats_.bytes_pipelined_.Average());
  Add(&stats, "persistentcache.blockcachetier.bytes_written",
      stats_.bytes_written_.Average());
  Add(&stats, "persistentcache.blockcachetier.bytes_read",
      stats_.bytes_read_.Average());
  Add(&stats, "persistentcache.blockcachetier.insert_dropped",
      stats_.insert_dropped_);
  Add(&stats, "persistentcache.blockcachetier.cache_hits",
      stats_.cache_hits_);
  Add(&stats, "persistentcache.blockcachetier.cache_misses",
      stats_.cache_misses_);
  Add(&stats, "persistentcache.blockcachetier.cache_errors",
      stats_.cache_errors_);
  Add(&stats, "persistentcache.blockcachetier.cache_hits_pct",
      stats_.CacheHitPct());
  Add(&stats, "persistentcache.blockcachetier.cache_misses_pct",
      stats_.CacheMissPct());
  Add(&stats, "persistentcache.blockcachetier.read_hit_latency",
      stats_.read_hit_latency_.Average());
  Add(&stats, "persistentcache.blockcachetier.read_miss_latency",
      stats_.read_miss_latency_.Average());
  Add(&stats, "persistentcache.blockcachetier.write_latency",
      stats_.write_latency_.Average());

  auto out = PersistentCacheTier::Stats();
  out.push_back(stats);
  return out;
}

Status BlockCacheTier::Insert(const Slice& key, const char* data,
                              const size_t size) {
  // update stats
  stats_.bytes_pipelined_.Add(size);

  if (opt_.pipeline_writes) {
    // off load the write to the write thread
    insert_ops_.Push(
        InsertOp(key.ToString(), std::move(std::string(data, size))));
    return Status::OK();
  }

  assert(!opt_.pipeline_writes);
  return InsertImpl(key, Slice(data, size));
}

void BlockCacheTier::InsertMain() {
  while (true) {
    InsertOp op(insert_ops_.Pop());

    if (op.signal_) {
      // that is a secret signal to exit
      break;
    }

    size_t retry = 0;
    Status s;
    while ((s = InsertImpl(Slice(op.key_), Slice(op.data_))).IsTryAgain()) {
      if (retry > kMaxRetry) {
        break;
      }

      // this can happen when the buffers are full, we wait till some buffers
      // are free. Why don't we wait inside the code. This is because we want
      // to support both pipelined and non-pipelined mode
      buffer_allocator_.WaitUntilUsable();
      retry++;
    }

    if (!s.ok()) {
      stats_.insert_dropped_++;
    }
  }
}

Status BlockCacheTier::InsertImpl(const Slice& key, const Slice& data) {
  // pre-condition
  assert(key.size());
  assert(data.size());
  assert(cache_file_);

  StopWatchNano timer(opt_.env, /*auto_start=*/ true);

  WriteLock _(&lock_);

  LBA lba;
  if (metadata_.Lookup(key, &lba)) {
    // the key already exists, this is duplicate insert
    return Status::OK();
  }

  while (!cache_file_->Append(key, data, &lba)) {
    if (!cache_file_->Eof()) {
      ROCKS_LOG_DEBUG(opt_.log, "Error inserting to cache file %d",
                      cache_file_->cacheid());
      stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);
      return Status::TryAgain();
    }

    assert(cache_file_->Eof());
    Status status = NewCacheFile();
    if (!status.ok()) {
      return status;
    }
  }

  // Insert into lookup index
  BlockInfo* info = metadata_.Insert(key, lba);
  assert(info);
  if (!info) {
    return Status::IOError("Unexpected error inserting to index");
  }

  // insert to cache file reverse mapping
  cache_file_->Add(info);

  // update stats
  stats_.bytes_written_.Add(data.size());
  stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);
  return Status::OK();
}

Status BlockCacheTier::Lookup(const Slice& key, std::unique_ptr<char[]>* val,
                              size_t* size) {
  StopWatchNano timer(opt_.env, /*auto_start=*/ true);

  LBA lba;
  bool status;
  status = metadata_.Lookup(key, &lba);
  if (!status) {
    stats_.cache_misses_++;
    stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
    return Status::NotFound("blockcache: key not found");
  }

  BlockCacheFile* const file = metadata_.Lookup(lba.cache_id_);
  if (!file) {
    // this can happen because the block index and cache file index are
    // different, and the cache file might be removed between the two lookups
    stats_.cache_misses_++;
    stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
    return Status::NotFound("blockcache: cache file not found");
  }

  assert(file->refs_);

  std::unique_ptr<char[]> scratch(new char[lba.size_]);
  Slice blk_key;
  Slice blk_val;

  status = file->Read(lba, &blk_key, &blk_val, scratch.get());
  --file->refs_;
  if (!status) {
    stats_.cache_misses_++;
    stats_.cache_errors_++;
    stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
    return Status::NotFound("blockcache: error reading data");
  }

  assert(blk_key == key);

  val->reset(new char[blk_val.size()]);
  memcpy(val->get(), blk_val.data(), blk_val.size());
  *size = blk_val.size();

  stats_.bytes_read_.Add(*size);
  stats_.cache_hits_++;
  stats_.read_hit_latency_.Add(timer.ElapsedNanos() / 1000);

  return Status::OK();
}

bool BlockCacheTier::Erase(const Slice& key) {
  WriteLock _(&lock_);
  BlockInfo* info = metadata_.Remove(key);
  assert(info);
  delete info;
  return true;
}

Status BlockCacheTier::NewCacheFile() {
  lock_.AssertHeld();

  TEST_SYNC_POINT_CALLBACK("BlockCacheTier::NewCacheFile:DeleteDir",
                           (void*)(GetCachePath().c_str()));

  std::unique_ptr<WriteableCacheFile> f(
    new WriteableCacheFile(opt_.env, &buffer_allocator_, &writer_,
                           GetCachePath(), writer_cache_id_,
                           opt_.cache_file_size, opt_.log));

  bool status = f->Create(opt_.enable_direct_writes, opt_.enable_direct_reads);
  if (!status) {
    return Status::IOError("Error creating file");
  }

  Info(opt_.log, "Created cache file %d", writer_cache_id_);

  writer_cache_id_++;
  cache_file_ = f.release();

  // insert to cache files tree
  status = metadata_.Insert(cache_file_);
  assert(status);
  if (!status) {
    Error(opt_.log, "Error inserting to metadata");
    return Status::IOError("Error inserting to metadata");
  }

  return Status::OK();
}

bool BlockCacheTier::Reserve(const size_t size) {
  WriteLock _(&lock_);
  assert(size_ <= opt_.cache_size);

  if (size + size_ <= opt_.cache_size) {
    // there is enough space to write
    size_ += size;
    return true;
  }

  assert(size + size_ >= opt_.cache_size);
  // there is not enough space to fit the requested data
  // we can clear some space by evicting cold data

  const double retain_fac = (100 - kEvictPct) / static_cast<double>(100);
  while (size + size_ > opt_.cache_size * retain_fac) {
    std::unique_ptr<BlockCacheFile> f(metadata_.Evict());
    if (!f) {
      // nothing is evictable
      return false;
    }
    assert(!f->refs_);
    uint64_t file_size;
    if (!f->Delete(&file_size).ok()) {
      // unable to delete file
      return false;
    }

    assert(file_size <= size_);
    size_ -= file_size;
  }

  size_ += size;
  assert(size_ <= opt_.cache_size * 0.9);
  return true;
}

Status NewPersistentCache(Env* const env, const std::string& path,
                          const uint64_t size,
                          const std::shared_ptr<Logger>& log,
                          const bool optimized_for_nvm,
                          std::shared_ptr<PersistentCache>* cache) {
  if (!cache) {
    return Status::IOError("invalid argument cache");
  }

  auto opt = PersistentCacheConfig(env, path, size, log);
  if (optimized_for_nvm) {
    // the default settings are optimized for SSD
    // NVM devices are better accessed with 4K direct IO and written with
    // parallelism
    opt.enable_direct_writes = true;
    opt.writer_qdepth = 4;
    opt.writer_dispatch_size = 4 * 1024;
  }

  auto pcache = std::make_shared<BlockCacheTier>(opt);
  Status s = pcache->Open();

  if (!s.ok()) {
    return s;
  }

  *cache = pcache;
  return s;
}

}  // namespace ROCKSDB_NAMESPACE

#endif  // ifndef ROCKSDB_LITE