BlobDB: update blob_db_options.bytes_per_sync behavior

Summary:
Previously, if blob_db_options.bytes_per_sync, there is a background job to call fsync() for every bytes_per_sync bytes written to a blob file. With the change we simply pass bytes_per_sync as env_options_ to blob files so that sync_file_range() will be used instead.
Closes https://github.com/facebook/rocksdb/pull/3297

Differential Revision: D6606994

Pulled By: yiwu-arbug

fbshipit-source-id: 452424be52e32ba92f5ea603b564e9b88929af47
main
Yi Wu 7 years ago committed by Facebook Github Bot
parent 06149429d9
commit 48cf8da2bb
  1. 8
      utilities/blob_db/blob_db.h
  2. 46
      utilities/blob_db/blob_db_impl.cc
  3. 10
      utilities/blob_db/blob_db_impl.h

@ -56,8 +56,10 @@ struct BlobDBOptions {
// will be inlined in base DB together with the key.
uint64_t min_blob_size = 0;
// at what bytes will the blob files be synced to blob log.
uint64_t bytes_per_sync = 0;
// Allows OS to incrementally sync blob files to disk for every
// bytes_per_sync bytes written. Users shouldn't rely on it for
// persistency guarantee.
uint64_t bytes_per_sync = 512 * 1024;
// the target size of each blob file. File will become immutable
// after it exceeds that size
@ -203,6 +205,8 @@ class BlobDB : public StackableDB {
virtual BlobDBOptions GetBlobDBOptions() const = 0;
virtual Status SyncBlobFiles() = 0;
virtual ~BlobDB() {}
protected:

@ -111,7 +111,6 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
cf_options_(cf_options),
env_options_(db_options),
statistics_(db_options_.statistics.get()),
dir_change_(false),
next_file_number_(1),
epoch_of_(0),
shutdown_(false),
@ -124,6 +123,7 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
blob_dir_ = (bdb_options_.path_relative)
? dbname + "/" + bdb_options_.blob_dir
: bdb_options_.blob_dir;
env_options_.bytes_per_sync = blob_db_options.bytes_per_sync;
}
BlobDBImpl::~BlobDBImpl() {
@ -225,8 +225,6 @@ void BlobDBImpl::StartBackgroundTasks() {
std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1));
tqueue_.add(kSanityCheckPeriodMillisecs,
std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
tqueue_.add(kFSyncFilesPeriodMillisecs,
std::bind(&BlobDBImpl::FsyncFiles, this, std::placeholders::_1));
tqueue_.add(
kCheckSeqFilesPeriodMillisecs,
std::bind(&BlobDBImpl::CheckSeqFiles, this, std::placeholders::_1));
@ -472,7 +470,6 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
return nullptr;
}
dir_change_.store(true);
blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
open_non_ttl_file_ = bfile;
return bfile;
@ -547,7 +544,6 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
return nullptr;
}
dir_change_.store(true);
blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
open_ttl_files_.insert(bfile);
epoch_of_++;
@ -1434,14 +1430,6 @@ std::pair<bool, int64_t> BlobDBImpl::CheckSeqFiles(bool aborted) {
return std::make_pair(true, -1);
}
std::pair<bool, int64_t> BlobDBImpl::FsyncFiles(bool aborted) {
if (aborted || shutdown_) {
return std::make_pair(false, -1);
}
SyncBlobFiles();
return std::make_pair(true, -1);
}
Status BlobDBImpl::SyncBlobFiles() {
MutexLock l(&write_mutex_);
@ -1449,35 +1437,30 @@ Status BlobDBImpl::SyncBlobFiles() {
{
ReadLock rl(&mutex_);
for (auto fitr : open_ttl_files_) {
if (fitr->NeedsFsync(true, bdb_options_.bytes_per_sync))
process_files.push_back(fitr);
process_files.push_back(fitr);
}
if (open_non_ttl_file_ != nullptr &&
open_non_ttl_file_->NeedsFsync(true, bdb_options_.bytes_per_sync)) {
if (open_non_ttl_file_ != nullptr) {
process_files.push_back(open_non_ttl_file_);
}
}
Status s;
for (auto& blob_file : process_files) {
if (blob_file->NeedsFsync(true, bdb_options_.bytes_per_sync)) {
s = blob_file->Fsync();
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to sync blob file %" PRIu64 ", status: %s",
blob_file->BlobFileNumber(), s.ToString().c_str());
return s;
}
s = blob_file->Fsync();
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to sync blob file %" PRIu64 ", status: %s",
blob_file->BlobFileNumber(), s.ToString().c_str());
return s;
}
}
bool expected = true;
if (dir_change_.compare_exchange_weak(expected, false)) {
s = dir_ent_->Fsync();
s = dir_ent_->Fsync();
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to sync blob directory, status: %s",
s.ToString().c_str());
}
return s;
}
@ -1732,7 +1715,6 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
WriteLock wl(&mutex_);
dir_change_.store(true);
blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile));
}

@ -158,9 +158,6 @@ class BlobDBImpl : public BlobDB {
// if 50% of the space of a blob file has been deleted/expired,
static constexpr uint32_t kPartialExpirationPercentage = 75;
// how often should we schedule a job to fsync open files
static constexpr uint32_t kFSyncFilesPeriodMillisecs = 10 * 1000;
// how often to schedule reclaim open files.
static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000;
@ -228,7 +225,7 @@ class BlobDBImpl : public BlobDB {
Status Open(std::vector<ColumnFamilyHandle*>* handles);
Status SyncBlobFiles();
Status SyncBlobFiles() override;
#ifndef NDEBUG
Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
@ -313,9 +310,6 @@ class BlobDBImpl : public BlobDB {
// Major task to garbage collect expired and deleted blobs
std::pair<bool, int64_t> RunGC(bool aborted);
// asynchronous task to fsync/fdatasync the open blob files
std::pair<bool, int64_t> FsyncFiles(bool aborted);
// periodically check if open blob files and their TTL's has expired
// if expired, close the sequential writer and make the file immutable
std::pair<bool, int64_t> CheckSeqFiles(bool aborted);
@ -420,8 +414,6 @@ class BlobDBImpl : public BlobDB {
// pointer to directory
std::unique_ptr<Directory> dir_ent_;
std::atomic<bool> dir_change_;
// Read Write Mutex, which protects all the data structures
// HEAVILY TRAFFICKED
mutable port::RWMutex mutex_;

Loading…
Cancel
Save