diff --git a/db/db_impl.cc b/db/db_impl.cc index d84e51699..50fb3da8f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -881,6 +881,11 @@ Status DBImpl::Recover(bool read_only, bool error_if_log_file_exist) { return s; } + s = env_->NewDirectory(dbname_, &db_directory_); + if (!s.ok()) { + return s; + } + s = env_->LockFile(LockFileName(dbname_), &db_lock_); if (!s.ok()) { return s; @@ -1173,6 +1178,9 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, (unsigned long) meta.number, (unsigned long) meta.file_size, s.ToString().c_str()); + if (!options_.disableDataSync) { + db_directory_->Fsync(); + } mutex_.Lock(); } base->Unref(); @@ -1267,8 +1275,8 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, // Replace immutable memtable with the generated Table s = imm_.InstallMemtableFlushResults( - mems, versions_.get(), s, &mutex_, options_.info_log.get(), - file_number, pending_outputs_, &deletion_state.memtables_to_free); + mems, versions_.get(), s, &mutex_, options_.info_log.get(), file_number, + pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get()); if (s.ok()) { InstallSuperVersion(deletion_state); @@ -1397,7 +1405,7 @@ Status DBImpl::ReFitLevel(int level, int target_level) { Log(options_.info_log, "Apply version edit:\n%s", edit.DebugString().data()); - status = versions_->LogAndApply(&edit, &mutex_); + status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get()); superversion_to_free = InstallSuperVersion(new_superversion); new_superversion = nullptr; @@ -1969,7 +1977,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest, f->smallest_seqno, f->largest_seqno); - status = versions_->LogAndApply(c->edit(), &mutex_); + status = versions_->LogAndApply(c->edit(), &mutex_, db_directory_.get()); InstallSuperVersion(deletion_state); Version::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", @@ -2217,7 +2225,8 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { compact->compaction->output_level(), out.number, out.file_size, out.smallest, out.largest, out.smallest_seqno, out.largest_seqno); } - return versions_->LogAndApply(compact->compaction->edit(), &mutex_); + return versions_->LogAndApply(compact->compaction->edit(), &mutex_, + db_directory_.get()); } // @@ -2590,6 +2599,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, } input.reset(); + if (!options_.disableDataSync) { + db_directory_->Fsync(); + } CompactionStats stats; stats.micros = env_->NowMicros() - start_micros - imm_micros; MeasureTime(options_.statistics.get(), COMPACTION_TIME, stats.micros); @@ -3827,7 +3839,7 @@ Status DBImpl::DeleteFile(std::string name) { } } edit.DeleteFile(level, number); - status = versions_->LogAndApply(&edit, &mutex_); + status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get()); if (status.ok()) { InstallSuperVersion(deletion_state); } @@ -3935,7 +3947,8 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { edit.SetLogNumber(new_log_number); impl->logfile_number_ = new_log_number; impl->log_.reset(new log::Writer(std::move(lfile))); - s = impl->versions_->LogAndApply(&edit, &impl->mutex_); + s = impl->versions_->LogAndApply(&edit, &impl->mutex_, + impl->db_directory_.get()); } if (s.ok()) { delete impl->InstallSuperVersion(new DBImpl::SuperVersion()); @@ -3943,6 +3956,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { impl->DeleteObsoleteFiles(); impl->MaybeScheduleFlushOrCompaction(); impl->MaybeScheduleLogDBDeployStats(); + s = impl->db_directory_->Fsync(); } } diff --git a/db/db_impl.h b/db/db_impl.h index abefcba61..fb17a41b7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -401,6 +401,8 @@ class DBImpl : public DB { std::string host_name_; + std::unique_ptr db_directory_; + // Queue of writers. std::deque writers_; WriteBatch tmp_batch_; diff --git a/db/memtablelist.cc b/db/memtablelist.cc index ca662569f..c93b58b6e 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -121,12 +121,10 @@ void MemTableList::PickMemtablesToFlush(std::vector* ret) { // Record a successful flush in the manifest file Status MemTableList::InstallMemtableFlushResults( - const std::vector &mems, - VersionSet* vset, Status flushStatus, - port::Mutex* mu, Logger* info_log, - uint64_t file_number, - std::set& pending_outputs, - std::vector* to_delete) { + const std::vector& mems, VersionSet* vset, Status flushStatus, + port::Mutex* mu, Logger* info_log, uint64_t file_number, + std::set& pending_outputs, std::vector* to_delete, + Directory* db_directory) { mu->AssertHeld(); // If the flush was not successful, then just reset state. @@ -178,7 +176,7 @@ Status MemTableList::InstallMemtableFlushResults( (unsigned long)m->file_number_); // this can release and reacquire the mutex. - s = vset->LogAndApply(&m->edit_, mu); + s = vset->LogAndApply(&m->edit_, mu, db_directory); // we will be changing the version in the next code path, // so we better create a new one, since versions are immutable diff --git a/db/memtablelist.h b/db/memtablelist.h index 2a2b61408..b2cd84103 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -90,12 +90,13 @@ class MemTableList { void PickMemtablesToFlush(std::vector* mems); // Commit a successful flush in the manifest file - Status InstallMemtableFlushResults(const std::vector &m, - VersionSet* vset, Status flushStatus, - port::Mutex* mu, Logger* info_log, - uint64_t file_number, - std::set& pending_outputs, - std::vector* to_delete); + Status InstallMemtableFlushResults(const std::vector& m, + VersionSet* vset, Status flushStatus, + port::Mutex* mu, Logger* info_log, + uint64_t file_number, + std::set& pending_outputs, + std::vector* to_delete, + Directory* db_directory); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). diff --git a/db/version_set.cc b/db/version_set.cc index 145a99114..bf778c9a9 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1419,6 +1419,7 @@ void VersionSet::AppendVersion(Version* v) { } Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, + Directory* db_directory, bool new_descriptor_log) { mu->AssertHeld(); @@ -1541,6 +1542,9 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, // of it later env_->DeleteFile(DescriptorFileName(dbname_, old_manifest_file_number)); } + if (!options_->disableDataSync && db_directory != nullptr) { + db_directory->Fsync(); + } } // find offset in manifest file where this version is stored. @@ -1821,7 +1825,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, VersionEdit ve; port::Mutex dummy_mutex; MutexLock l(&dummy_mutex); - return versions.LogAndApply(&ve, &dummy_mutex, true); + return versions.LogAndApply(&ve, &dummy_mutex, nullptr, true); } Status VersionSet::DumpManifest(Options& options, std::string& dscname, diff --git a/db/version_set.h b/db/version_set.h index 6b91355f7..376b259b9 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -284,6 +284,7 @@ class VersionSet { // REQUIRES: *mu is held on entry. // REQUIRES: no other thread concurrently calls LogAndApply() Status LogAndApply(VersionEdit* edit, port::Mutex* mu, + Directory* db_directory = nullptr, bool new_descriptor_log = false); // Recover the last saved descriptor from persistent storage. diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index cb8ca623f..886ccdac3 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -70,6 +70,9 @@ class HdfsEnv : public Env { unique_ptr* result, const EnvOptions& options); + virtual Status NewDirectory(const std::string& name, + unique_ptr* result); + virtual bool FileExists(const std::string& fname); virtual Status GetChildren(const std::string& path, @@ -246,6 +249,11 @@ class HdfsEnv : public Env { return notsup; } + virtual Status NewDirectory(const std::string& name, + unique_ptr* result) { + return notsup; + } + virtual bool FileExists(const std::string& fname){return false;} virtual Status GetChildren(const std::string& path, diff --git a/helpers/memenv/memenv.cc b/helpers/memenv/memenv.cc index 15f1383a6..34d97fa76 100644 --- a/helpers/memenv/memenv.cc +++ b/helpers/memenv/memenv.cc @@ -221,6 +221,11 @@ class WritableFileImpl : public WritableFile { FileState* file_; }; +class InMemoryDirectory : public Directory { + public: + virtual Status Fsync() { return Status::OK(); } +}; + class InMemoryEnv : public EnvWrapper { public: explicit InMemoryEnv(Env* base_env) : EnvWrapper(base_env) { } @@ -274,6 +279,12 @@ class InMemoryEnv : public EnvWrapper { return Status::OK(); } + virtual Status NewDirectory(const std::string& name, + unique_ptr* result) { + result->reset(new InMemoryDirectory()); + return Status::OK(); + } + virtual bool FileExists(const std::string& fname) { MutexLock lock(&mutex_); return file_map_.find(fname) != file_map_.end(); diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 73acbfabe..06e9b94aa 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -33,6 +33,7 @@ class SequentialFile; class Slice; class WritableFile; class RandomRWFile; +class Directory; struct Options; using std::unique_ptr; @@ -122,6 +123,16 @@ class Env { unique_ptr* result, const EnvOptions& options) = 0; + // Create an object that represents a directory. Will fail if directory + // doesn't exist. If the directory exists, it will open the directory + // and create a new Directory object. + // + // On success, stores a pointer to the new Directory in + // *result and returns OK. On failure stores nullptr in *result and + // returns non-OK. + virtual Status NewDirectory(const std::string& name, + unique_ptr* result) = 0; + // Returns true iff the named file exists. virtual bool FileExists(const std::string& fname) = 0; @@ -488,6 +499,15 @@ class RandomRWFile { void operator=(const RandomRWFile&); }; +// Directory object represents collection of files and implements +// filesystem operations that can be executed on directories. +class Directory { + public: + virtual ~Directory() {} + // Fsync directory + virtual Status Fsync() = 0; +}; + // An interface for writing log messages. class Logger { public: @@ -578,6 +598,10 @@ class EnvWrapper : public Env { const EnvOptions& options) { return target_->NewRandomRWFile(f, r, options); } + virtual Status NewDirectory(const std::string& name, + unique_ptr* result) { + return target_->NewDirectory(name, result); + } bool FileExists(const std::string& f) { return target_->FileExists(f); } Status GetChildren(const std::string& dir, std::vector* r) { return target_->GetChildren(dir, r); diff --git a/util/env_hdfs.cc b/util/env_hdfs.cc index 0f8fe0d11..67f0ef797 100644 --- a/util/env_hdfs.cc +++ b/util/env_hdfs.cc @@ -366,6 +366,11 @@ Status HdfsEnv::NewRandomRWFile(const std::string& fname, return Status::NotSupported("NewRandomRWFile not supported on HdfsEnv"); } +virtual Status NewDirectory(const std::string& name, + unique_ptr* result) { + return Status::NotSupported("NewDirectory not yet supported on HdfsEnv"); +} + bool HdfsEnv::FileExists(const std::string& fname) { int value = hdfsExists(fileSys_, fname.c_str()); if (value == 0) { diff --git a/util/env_posix.cc b/util/env_posix.cc index 2be524e95..fb9d311b8 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -867,6 +867,24 @@ class PosixRandomRWFile : public RandomRWFile { #endif }; +class PosixDirectory : public Directory { + public: + explicit PosixDirectory(int fd) : fd_(fd) {} + ~PosixDirectory() { + close(fd_); + } + + virtual Status Fsync() { + if (fsync(fd_) == -1) { + return IOError("directory", errno); + } + return Status::OK(); + } + + private: + int fd_; +}; + static int LockOrUnlock(const std::string& fname, int fd, bool lock) { mutex_lockedFiles.Lock(); if (lock) { @@ -1038,6 +1056,18 @@ class PosixEnv : public Env { return s; } + virtual Status NewDirectory(const std::string& name, + unique_ptr* result) { + result->reset(); + const int fd = open(name.c_str(), 0); + if (fd < 0) { + return IOError(name, errno); + } else { + result->reset(new PosixDirectory(fd)); + } + return Status::OK(); + } + virtual bool FileExists(const std::string& fname) { return access(fname.c_str(), F_OK) == 0; }