fork of https://github.com/oxigraph/rocksdb and https://github.com/facebook/rocksdb for nextgraph and oxigraph
				
			
			
		
			You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							611 lines
						
					
					
						
							16 KiB
						
					
					
				
			
		
		
	
	
							611 lines
						
					
					
						
							16 KiB
						
					
					
				| //  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
 | |
| //  This source code is licensed under both the GPLv2 (found in the
 | |
| //  COPYING file in the root directory) and Apache 2.0 License
 | |
| //  (found in the LICENSE.Apache file in the root directory).
 | |
| #ifndef ROCKSDB_LITE
 | |
| 
 | |
| #include "utilities/persistent_cache/block_cache_tier_file.h"
 | |
| 
 | |
| #ifndef OS_WIN
 | |
| #include <unistd.h>
 | |
| #endif
 | |
| #include <functional>
 | |
| #include <memory>
 | |
| #include <vector>
 | |
| 
 | |
| #include "env/composite_env_wrapper.h"
 | |
| #include "logging/logging.h"
 | |
| #include "port/port.h"
 | |
| #include "rocksdb/system_clock.h"
 | |
| #include "util/crc32c.h"
 | |
| 
 | |
| namespace ROCKSDB_NAMESPACE {
 | |
| 
 | |
| //
 | |
| // File creation factories
 | |
| //
 | |
| Status NewWritableCacheFile(Env* const env, const std::string& filepath,
 | |
|                             std::unique_ptr<WritableFile>* file,
 | |
|                             const bool use_direct_writes = false) {
 | |
|   EnvOptions opt;
 | |
|   opt.use_direct_writes = use_direct_writes;
 | |
|   Status s = env->NewWritableFile(filepath, file, opt);
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Status NewRandomAccessCacheFile(const std::shared_ptr<FileSystem>& fs,
 | |
|                                 const std::string& filepath,
 | |
|                                 std::unique_ptr<FSRandomAccessFile>* file,
 | |
|                                 const bool use_direct_reads = true) {
 | |
|   assert(fs.get());
 | |
| 
 | |
|   FileOptions opt;
 | |
|   opt.use_direct_reads = use_direct_reads;
 | |
|   return fs->NewRandomAccessFile(filepath, opt, file, nullptr);
 | |
| }
 | |
| 
 | |
| //
 | |
| // BlockCacheFile
 | |
| //
 | |
| Status BlockCacheFile::Delete(uint64_t* size) {
 | |
|   assert(env_);
 | |
| 
 | |
|   Status status = env_->GetFileSize(Path(), size);
 | |
|   if (!status.ok()) {
 | |
|     return status;
 | |
|   }
 | |
|   return env_->DeleteFile(Path());
 | |
| }
 | |
| 
 | |
| //
 | |
| // CacheRecord
 | |
| //
 | |
| // Cache record represents the record on disk
 | |
| //
 | |
| // +--------+---------+----------+------------+---------------+-------------+
 | |
| // | magic  | crc     | key size | value size | key data      | value data  |
 | |
| // +--------+---------+----------+------------+---------------+-------------+
 | |
| // <-- 4 --><-- 4  --><-- 4   --><-- 4     --><-- key size  --><-- v-size -->
 | |
| //
 | |
| struct CacheRecordHeader {
 | |
|   CacheRecordHeader()
 | |
|     : magic_(0), crc_(0), key_size_(0), val_size_(0) {}
 | |
|   CacheRecordHeader(const uint32_t magic, const uint32_t key_size,
 | |
|                     const uint32_t val_size)
 | |
|       : magic_(magic), crc_(0), key_size_(key_size), val_size_(val_size) {}
 | |
| 
 | |
|   uint32_t magic_;
 | |
|   uint32_t crc_;
 | |
|   uint32_t key_size_;
 | |
|   uint32_t val_size_;
 | |
| };
 | |
| 
 | |
| struct CacheRecord {
 | |
|   CacheRecord() {}
 | |
|   CacheRecord(const Slice& key, const Slice& val)
 | |
|       : hdr_(MAGIC, static_cast<uint32_t>(key.size()),
 | |
|              static_cast<uint32_t>(val.size())),
 | |
|         key_(key),
 | |
|         val_(val) {
 | |
|     hdr_.crc_ = ComputeCRC();
 | |
|   }
 | |
| 
 | |
|   uint32_t ComputeCRC() const;
 | |
|   bool Serialize(std::vector<CacheWriteBuffer*>* bufs, size_t* woff);
 | |
|   bool Deserialize(const Slice& buf);
 | |
| 
 | |
|   static uint32_t CalcSize(const Slice& key, const Slice& val) {
 | |
|     return static_cast<uint32_t>(sizeof(CacheRecordHeader) + key.size() +
 | |
|                                  val.size());
 | |
|   }
 | |
| 
 | |
|   static const uint32_t MAGIC = 0xfefa;
 | |
| 
 | |
|   bool Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff,
 | |
|               const char* data, const size_t size);
 | |
| 
 | |
|   CacheRecordHeader hdr_;
 | |
|   Slice key_;
 | |
|   Slice val_;
 | |
| };
 | |
| 
 | |
| static_assert(sizeof(CacheRecordHeader) == 16, "DataHeader is not aligned");
 | |
| 
 | |
| uint32_t CacheRecord::ComputeCRC() const {
 | |
|   uint32_t crc = 0;
 | |
|   CacheRecordHeader tmp = hdr_;
 | |
|   tmp.crc_ = 0;
 | |
|   crc = crc32c::Extend(crc, reinterpret_cast<const char*>(&tmp), sizeof(tmp));
 | |
|   crc = crc32c::Extend(crc, reinterpret_cast<const char*>(key_.data()),
 | |
|                        key_.size());
 | |
|   crc = crc32c::Extend(crc, reinterpret_cast<const char*>(val_.data()),
 | |
|                        val_.size());
 | |
|   return crc;
 | |
| }
 | |
| 
 | |
| bool CacheRecord::Serialize(std::vector<CacheWriteBuffer*>* bufs,
 | |
|                             size_t* woff) {
 | |
|   assert(bufs->size());
 | |
|   return Append(bufs, woff, reinterpret_cast<const char*>(&hdr_),
 | |
|                 sizeof(hdr_)) &&
 | |
|          Append(bufs, woff, reinterpret_cast<const char*>(key_.data()),
 | |
|                 key_.size()) &&
 | |
|          Append(bufs, woff, reinterpret_cast<const char*>(val_.data()),
 | |
|                 val_.size());
 | |
| }
 | |
| 
 | |
| bool CacheRecord::Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff,
 | |
|                          const char* data, const size_t data_size) {
 | |
|   assert(*woff < bufs->size());
 | |
| 
 | |
|   const char* p = data;
 | |
|   size_t size = data_size;
 | |
| 
 | |
|   while (size && *woff < bufs->size()) {
 | |
|     CacheWriteBuffer* buf = (*bufs)[*woff];
 | |
|     const size_t free = buf->Free();
 | |
|     if (size <= free) {
 | |
|       buf->Append(p, size);
 | |
|       size = 0;
 | |
|     } else {
 | |
|       buf->Append(p, free);
 | |
|       p += free;
 | |
|       size -= free;
 | |
|       assert(!buf->Free());
 | |
|       assert(buf->Used() == buf->Capacity());
 | |
|     }
 | |
| 
 | |
|     if (!buf->Free()) {
 | |
|       *woff += 1;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   assert(!size);
 | |
| 
 | |
|   return !size;
 | |
| }
 | |
| 
 | |
| bool CacheRecord::Deserialize(const Slice& data) {
 | |
|   assert(data.size() >= sizeof(CacheRecordHeader));
 | |
|   if (data.size() < sizeof(CacheRecordHeader)) {
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|   memcpy(&hdr_, data.data(), sizeof(hdr_));
 | |
| 
 | |
|   assert(hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) == data.size());
 | |
|   if (hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) != data.size()) {
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|   key_ = Slice(data.data_ + sizeof(hdr_), hdr_.key_size_);
 | |
|   val_ = Slice(key_.data_ + hdr_.key_size_, hdr_.val_size_);
 | |
| 
 | |
|   if (!(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_)) {
 | |
|     fprintf(stderr, "** magic %d ** \n", hdr_.magic_);
 | |
|     fprintf(stderr, "** key_size %d ** \n", hdr_.key_size_);
 | |
|     fprintf(stderr, "** val_size %d ** \n", hdr_.val_size_);
 | |
|     fprintf(stderr, "** key %s ** \n", key_.ToString().c_str());
 | |
|     fprintf(stderr, "** val %s ** \n", val_.ToString().c_str());
 | |
|     for (size_t i = 0; i < hdr_.val_size_; ++i) {
 | |
|       fprintf(stderr, "%d.", (uint8_t)val_.data()[i]);
 | |
|     }
 | |
|     fprintf(stderr, "\n** cksum %d != %d **", hdr_.crc_, ComputeCRC());
 | |
|   }
 | |
| 
 | |
|   assert(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_);
 | |
|   return hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_;
 | |
| }
 | |
| 
 | |
| //
 | |
| // RandomAccessFile
 | |
| //
 | |
| 
 | |
| bool RandomAccessCacheFile::Open(const bool enable_direct_reads) {
 | |
|   WriteLock _(&rwlock_);
 | |
|   return OpenImpl(enable_direct_reads);
 | |
| }
 | |
| 
 | |
| bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) {
 | |
|   rwlock_.AssertHeld();
 | |
| 
 | |
|   ROCKS_LOG_DEBUG(log_, "Opening cache file %s", Path().c_str());
 | |
|   assert(env_);
 | |
| 
 | |
|   std::unique_ptr<FSRandomAccessFile> file;
 | |
|   Status status = NewRandomAccessCacheFile(env_->GetFileSystem(), Path(), &file,
 | |
|                                            enable_direct_reads);
 | |
|   if (!status.ok()) {
 | |
|     Error(log_, "Error opening random access file %s. %s", Path().c_str(),
 | |
|           status.ToString().c_str());
 | |
|     return false;
 | |
|   }
 | |
|   freader_.reset(new RandomAccessFileReader(std::move(file), Path(),
 | |
|                                             env_->GetSystemClock().get()));
 | |
| 
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val,
 | |
|                                  char* scratch) {
 | |
|   ReadLock _(&rwlock_);
 | |
| 
 | |
|   assert(lba.cache_id_ == cache_id_);
 | |
| 
 | |
|   if (!freader_) {
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|   Slice result;
 | |
|   Status s = freader_->Read(IOOptions(), lba.off_, lba.size_, &result, scratch,
 | |
|                             nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
 | |
|   if (!s.ok()) {
 | |
|     Error(log_, "Error reading from file %s. %s", Path().c_str(),
 | |
|           s.ToString().c_str());
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|   assert(result.data() == scratch);
 | |
| 
 | |
|   return ParseRec(lba, key, val, scratch);
 | |
| }
 | |
| 
 | |
| bool RandomAccessCacheFile::ParseRec(const LBA& lba, Slice* key, Slice* val,
 | |
|                                      char* scratch) {
 | |
|   Slice data(scratch, lba.size_);
 | |
| 
 | |
|   CacheRecord rec;
 | |
|   if (!rec.Deserialize(data)) {
 | |
|     assert(!"Error deserializing data");
 | |
|     Error(log_, "Error de-serializing record from file %s off %d",
 | |
|           Path().c_str(), lba.off_);
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|   *key = Slice(rec.key_);
 | |
|   *val = Slice(rec.val_);
 | |
| 
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| //
 | |
| // WriteableCacheFile
 | |
| //
 | |
| 
 | |
| WriteableCacheFile::~WriteableCacheFile() {
 | |
|   WriteLock _(&rwlock_);
 | |
|   if (!eof_) {
 | |
|     // This file never flushed. We give priority to shutdown since this is a
 | |
|     // cache
 | |
|     // TODO(krad): Figure a way to flush the pending data
 | |
|     if (file_) {
 | |
|       assert(refs_ == 1);
 | |
|       --refs_;
 | |
|     }
 | |
|   }
 | |
|   assert(!refs_);
 | |
|   ClearBuffers();
 | |
| }
 | |
| 
 | |
| bool WriteableCacheFile::Create(const bool /*enable_direct_writes*/,
 | |
|                                 const bool enable_direct_reads) {
 | |
|   WriteLock _(&rwlock_);
 | |
| 
 | |
|   enable_direct_reads_ = enable_direct_reads;
 | |
| 
 | |
|   ROCKS_LOG_DEBUG(log_, "Creating new cache %s (max size is %d B)",
 | |
|                   Path().c_str(), max_size_);
 | |
| 
 | |
|   assert(env_);
 | |
| 
 | |
|   Status s = env_->FileExists(Path());
 | |
|   if (s.ok()) {
 | |
|     ROCKS_LOG_WARN(log_, "File %s already exists. %s", Path().c_str(),
 | |
|                    s.ToString().c_str());
 | |
|   }
 | |
| 
 | |
|   s = NewWritableCacheFile(env_, Path(), &file_);
 | |
|   if (!s.ok()) {
 | |
|     ROCKS_LOG_WARN(log_, "Unable to create file %s. %s", Path().c_str(),
 | |
|                    s.ToString().c_str());
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|   assert(!refs_);
 | |
|   ++refs_;
 | |
| 
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| bool WriteableCacheFile::Append(const Slice& key, const Slice& val, LBA* lba) {
 | |
|   WriteLock _(&rwlock_);
 | |
| 
 | |
|   if (eof_) {
 | |
|     // We can't append since the file is full
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|   // estimate the space required to store the (key, val)
 | |
|   uint32_t rec_size = CacheRecord::CalcSize(key, val);
 | |
| 
 | |
|   if (!ExpandBuffer(rec_size)) {
 | |
|     // unable to expand the buffer
 | |
|     ROCKS_LOG_DEBUG(log_, "Error expanding buffers. size=%d", rec_size);
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|   lba->cache_id_ = cache_id_;
 | |
|   lba->off_ = disk_woff_;
 | |
|   lba->size_ = rec_size;
 | |
| 
 | |
|   CacheRecord rec(key, val);
 | |
|   if (!rec.Serialize(&bufs_, &buf_woff_)) {
 | |
|     // unexpected error: unable to serialize the data
 | |
|     assert(!"Error serializing record");
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|   disk_woff_ += rec_size;
 | |
|   eof_ = disk_woff_ >= max_size_;
 | |
| 
 | |
|   // dispatch buffer for flush
 | |
|   DispatchBuffer();
 | |
| 
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| bool WriteableCacheFile::ExpandBuffer(const size_t size) {
 | |
|   rwlock_.AssertHeld();
 | |
|   assert(!eof_);
 | |
| 
 | |
|   // determine if there is enough space
 | |
|   size_t free = 0;  // compute the free space left in buffer
 | |
|   for (size_t i = buf_woff_; i < bufs_.size(); ++i) {
 | |
|     free += bufs_[i]->Free();
 | |
|     if (size <= free) {
 | |
|       // we have enough space in the buffer
 | |
|       return true;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // expand the buffer until there is enough space to write `size` bytes
 | |
|   assert(free < size);
 | |
|   assert(alloc_);
 | |
| 
 | |
|   while (free < size) {
 | |
|     CacheWriteBuffer* const buf = alloc_->Allocate();
 | |
|     if (!buf) {
 | |
|       ROCKS_LOG_DEBUG(log_, "Unable to allocate buffers");
 | |
|       return false;
 | |
|     }
 | |
| 
 | |
|     size_ += static_cast<uint32_t>(buf->Free());
 | |
|     free += buf->Free();
 | |
|     bufs_.push_back(buf);
 | |
|   }
 | |
| 
 | |
|   assert(free >= size);
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| void WriteableCacheFile::DispatchBuffer() {
 | |
|   rwlock_.AssertHeld();
 | |
| 
 | |
|   assert(bufs_.size());
 | |
|   assert(buf_doff_ <= buf_woff_);
 | |
|   assert(buf_woff_ <= bufs_.size());
 | |
| 
 | |
|   if (pending_ios_) {
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   if (!eof_ && buf_doff_ == buf_woff_) {
 | |
|     // dispatch buffer is pointing to write buffer and we haven't hit eof
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   assert(eof_ || buf_doff_ < buf_woff_);
 | |
|   assert(buf_doff_ < bufs_.size());
 | |
|   assert(file_);
 | |
|   assert(alloc_);
 | |
| 
 | |
|   auto* buf = bufs_[buf_doff_];
 | |
|   const uint64_t file_off = buf_doff_ * alloc_->BufferSize();
 | |
| 
 | |
|   assert(!buf->Free() ||
 | |
|          (eof_ && buf_doff_ == buf_woff_ && buf_woff_ < bufs_.size()));
 | |
|   // we have reached end of file, and there is space in the last buffer
 | |
|   // pad it with zero for direct IO
 | |
|   buf->FillTrailingZeros();
 | |
| 
 | |
|   assert(buf->Used() % kFileAlignmentSize == 0);
 | |
| 
 | |
|   writer_->Write(file_.get(), buf, file_off,
 | |
|                  std::bind(&WriteableCacheFile::BufferWriteDone, this));
 | |
|   pending_ios_++;
 | |
|   buf_doff_++;
 | |
| }
 | |
| 
 | |
| void WriteableCacheFile::BufferWriteDone() {
 | |
|   WriteLock _(&rwlock_);
 | |
| 
 | |
|   assert(bufs_.size());
 | |
| 
 | |
|   pending_ios_--;
 | |
| 
 | |
|   if (buf_doff_ < bufs_.size()) {
 | |
|     DispatchBuffer();
 | |
|   }
 | |
| 
 | |
|   if (eof_ && buf_doff_ >= bufs_.size() && !pending_ios_) {
 | |
|     // end-of-file reached, move to read mode
 | |
|     CloseAndOpenForReading();
 | |
|   }
 | |
| }
 | |
| 
 | |
| void WriteableCacheFile::CloseAndOpenForReading() {
 | |
|   // Our env abstraction do not allow reading from a file opened for appending
 | |
|   // We need close the file and re-open it for reading
 | |
|   Close();
 | |
|   RandomAccessCacheFile::OpenImpl(enable_direct_reads_);
 | |
| }
 | |
| 
 | |
| bool WriteableCacheFile::ReadBuffer(const LBA& lba, Slice* key, Slice* block,
 | |
|                                     char* scratch) {
 | |
|   rwlock_.AssertHeld();
 | |
| 
 | |
|   if (!ReadBuffer(lba, scratch)) {
 | |
|     Error(log_, "Error reading from buffer. cache=%d off=%d", cache_id_,
 | |
|           lba.off_);
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|   return ParseRec(lba, key, block, scratch);
 | |
| }
 | |
| 
 | |
| bool WriteableCacheFile::ReadBuffer(const LBA& lba, char* data) {
 | |
|   rwlock_.AssertHeld();
 | |
| 
 | |
|   assert(lba.off_ < disk_woff_);
 | |
|   assert(alloc_);
 | |
| 
 | |
|   // we read from the buffers like reading from a flat file. The list of buffers
 | |
|   // are treated as contiguous stream of data
 | |
| 
 | |
|   char* tmp = data;
 | |
|   size_t pending_nbytes = lba.size_;
 | |
|   // start buffer
 | |
|   size_t start_idx = lba.off_ / alloc_->BufferSize();
 | |
|   // offset into the start buffer
 | |
|   size_t start_off = lba.off_ % alloc_->BufferSize();
 | |
| 
 | |
|   assert(start_idx <= buf_woff_);
 | |
| 
 | |
|   for (size_t i = start_idx; pending_nbytes && i < bufs_.size(); ++i) {
 | |
|     assert(i <= buf_woff_);
 | |
|     auto* buf = bufs_[i];
 | |
|     assert(i == buf_woff_ || !buf->Free());
 | |
|     // bytes to write to the buffer
 | |
|     size_t nbytes = pending_nbytes > (buf->Used() - start_off)
 | |
|                         ? (buf->Used() - start_off)
 | |
|                         : pending_nbytes;
 | |
|     memcpy(tmp, buf->Data() + start_off, nbytes);
 | |
| 
 | |
|     // left over to be written
 | |
|     pending_nbytes -= nbytes;
 | |
|     start_off = 0;
 | |
|     tmp += nbytes;
 | |
|   }
 | |
| 
 | |
|   assert(!pending_nbytes);
 | |
|   if (pending_nbytes) {
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|   assert(tmp == data + lba.size_);
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| void WriteableCacheFile::Close() {
 | |
|   rwlock_.AssertHeld();
 | |
| 
 | |
|   assert(size_ >= max_size_);
 | |
|   assert(disk_woff_ >= max_size_);
 | |
|   assert(buf_doff_ == bufs_.size());
 | |
|   assert(bufs_.size() - buf_woff_ <= 1);
 | |
|   assert(!pending_ios_);
 | |
| 
 | |
|   Info(log_, "Closing file %s. size=%d written=%d", Path().c_str(), size_,
 | |
|        disk_woff_);
 | |
| 
 | |
|   ClearBuffers();
 | |
|   file_.reset();
 | |
| 
 | |
|   assert(refs_);
 | |
|   --refs_;
 | |
| }
 | |
| 
 | |
| void WriteableCacheFile::ClearBuffers() {
 | |
|   assert(alloc_);
 | |
| 
 | |
|   for (size_t i = 0; i < bufs_.size(); ++i) {
 | |
|     alloc_->Deallocate(bufs_[i]);
 | |
|   }
 | |
| 
 | |
|   bufs_.clear();
 | |
| }
 | |
| 
 | |
| //
 | |
| // ThreadedFileWriter implementation
 | |
| //
 | |
| ThreadedWriter::ThreadedWriter(PersistentCacheTier* const cache,
 | |
|                                const size_t qdepth, const size_t io_size)
 | |
|     : Writer(cache), io_size_(io_size) {
 | |
|   for (size_t i = 0; i < qdepth; ++i) {
 | |
|     port::Thread th(&ThreadedWriter::ThreadMain, this);
 | |
|     threads_.push_back(std::move(th));
 | |
|   }
 | |
| }
 | |
| 
 | |
| void ThreadedWriter::Stop() {
 | |
|   // notify all threads to exit
 | |
|   for (size_t i = 0; i < threads_.size(); ++i) {
 | |
|     q_.Push(IO(/*signal=*/true));
 | |
|   }
 | |
| 
 | |
|   // wait for all threads to exit
 | |
|   for (auto& th : threads_) {
 | |
|     th.join();
 | |
|     assert(!th.joinable());
 | |
|   }
 | |
|   threads_.clear();
 | |
| }
 | |
| 
 | |
| void ThreadedWriter::Write(WritableFile* const file, CacheWriteBuffer* buf,
 | |
|                            const uint64_t file_off,
 | |
|                            const std::function<void()> callback) {
 | |
|   q_.Push(IO(file, buf, file_off, callback));
 | |
| }
 | |
| 
 | |
| void ThreadedWriter::ThreadMain() {
 | |
|   while (true) {
 | |
|     // Fetch the IO to process
 | |
|     IO io(q_.Pop());
 | |
|     if (io.signal_) {
 | |
|       // that's secret signal to exit
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     // Reserve space for writing the buffer
 | |
|     while (!cache_->Reserve(io.buf_->Used())) {
 | |
|       // We can fail to reserve space if every file in the system
 | |
|       // is being currently accessed
 | |
|       /* sleep override */
 | |
|       SystemClock::Default()->SleepForMicroseconds(1000000);
 | |
|     }
 | |
| 
 | |
|     DispatchIO(io);
 | |
| 
 | |
|     io.callback_();
 | |
|   }
 | |
| }
 | |
| 
 | |
| void ThreadedWriter::DispatchIO(const IO& io) {
 | |
|   size_t written = 0;
 | |
|   while (written < io.buf_->Used()) {
 | |
|     Slice data(io.buf_->Data() + written, io_size_);
 | |
|     Status s = io.file_->Append(data);
 | |
|     assert(s.ok());
 | |
|     if (!s.ok()) {
 | |
|       // That is definite IO error to device. There is not much we can
 | |
|       // do but ignore the failure. This can lead to corruption of data on
 | |
|       // disk, but the cache will skip while reading
 | |
|       fprintf(stderr, "Error writing data to file. %s\n", s.ToString().c_str());
 | |
|     }
 | |
|     written += io_size_;
 | |
|   }
 | |
| }
 | |
| 
 | |
| }  // namespace ROCKSDB_NAMESPACE
 | |
| 
 | |
| #endif
 | |
| 
 |