Fsync directory after we create a new file

Summary:
@dhruba, I'm not sure where we need to sync the directory. I implemented the function in Env() and added the dir sync just after we close the newly created file in the builder.

Should I also add FsyncDir() to new files that get created by a compaction?

Test Plan: Confirmed that FsyncDir is returning Status::OK()

Reviewers: dhruba, haobo

Reviewed By: dhruba

CC: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D14751
main
Igor Canadi 11 years ago
parent 6c2ca1d3e6
commit 832158e7f7
  1. 28
      db/db_impl.cc
  2. 2
      db/db_impl.h
  3. 12
      db/memtablelist.cc
  4. 3
      db/memtablelist.h
  5. 6
      db/version_set.cc
  6. 1
      db/version_set.h
  7. 8
      hdfs/env_hdfs.h
  8. 11
      helpers/memenv/memenv.cc
  9. 24
      include/rocksdb/env.h
  10. 5
      util/env_hdfs.cc
  11. 30
      util/env_posix.cc

@ -881,6 +881,11 @@ Status DBImpl::Recover(bool read_only, bool error_if_log_file_exist) {
return s;
}
s = env_->NewDirectory(dbname_, &db_directory_);
if (!s.ok()) {
return s;
}
s = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!s.ok()) {
return s;
@ -1173,6 +1178,9 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
(unsigned long) meta.number,
(unsigned long) meta.file_size,
s.ToString().c_str());
if (!options_.disableDataSync) {
db_directory_->Fsync();
}
mutex_.Lock();
}
base->Unref();
@ -1267,8 +1275,8 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
// Replace immutable memtable with the generated Table
s = imm_.InstallMemtableFlushResults(
mems, versions_.get(), s, &mutex_, options_.info_log.get(),
file_number, pending_outputs_, &deletion_state.memtables_to_free);
mems, versions_.get(), s, &mutex_, options_.info_log.get(), file_number,
pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get());
if (s.ok()) {
InstallSuperVersion(deletion_state);
@ -1397,7 +1405,7 @@ Status DBImpl::ReFitLevel(int level, int target_level) {
Log(options_.info_log, "Apply version edit:\n%s",
edit.DebugString().data());
status = versions_->LogAndApply(&edit, &mutex_);
status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get());
superversion_to_free = InstallSuperVersion(new_superversion);
new_superversion = nullptr;
@ -1969,7 +1977,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
status = versions_->LogAndApply(c->edit(), &mutex_);
status = versions_->LogAndApply(c->edit(), &mutex_, db_directory_.get());
InstallSuperVersion(deletion_state);
Version::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
@ -2217,7 +2225,8 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
compact->compaction->output_level(), out.number, out.file_size,
out.smallest, out.largest, out.smallest_seqno, out.largest_seqno);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
return versions_->LogAndApply(compact->compaction->edit(), &mutex_,
db_directory_.get());
}
//
@ -2590,6 +2599,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
}
input.reset();
if (!options_.disableDataSync) {
db_directory_->Fsync();
}
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros - imm_micros;
MeasureTime(options_.statistics.get(), COMPACTION_TIME, stats.micros);
@ -3827,7 +3839,7 @@ Status DBImpl::DeleteFile(std::string name) {
}
}
edit.DeleteFile(level, number);
status = versions_->LogAndApply(&edit, &mutex_);
status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get());
if (status.ok()) {
InstallSuperVersion(deletion_state);
}
@ -3935,7 +3947,8 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
edit.SetLogNumber(new_log_number);
impl->logfile_number_ = new_log_number;
impl->log_.reset(new log::Writer(std::move(lfile)));
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_,
impl->db_directory_.get());
}
if (s.ok()) {
delete impl->InstallSuperVersion(new DBImpl::SuperVersion());
@ -3943,6 +3956,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
impl->DeleteObsoleteFiles();
impl->MaybeScheduleFlushOrCompaction();
impl->MaybeScheduleLogDBDeployStats();
s = impl->db_directory_->Fsync();
}
}

@ -401,6 +401,8 @@ class DBImpl : public DB {
std::string host_name_;
std::unique_ptr<Directory> db_directory_;
// Queue of writers.
std::deque<Writer*> writers_;
WriteBatch tmp_batch_;

@ -121,12 +121,10 @@ void MemTableList::PickMemtablesToFlush(std::vector<MemTable*>* ret) {
// Record a successful flush in the manifest file
Status MemTableList::InstallMemtableFlushResults(
const std::vector<MemTable*> &mems,
VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log,
uint64_t file_number,
std::set<uint64_t>& pending_outputs,
std::vector<MemTable*>* to_delete) {
const std::vector<MemTable*>& mems, VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log, uint64_t file_number,
std::set<uint64_t>& pending_outputs, std::vector<MemTable*>* to_delete,
Directory* db_directory) {
mu->AssertHeld();
// If the flush was not successful, then just reset state.
@ -178,7 +176,7 @@ Status MemTableList::InstallMemtableFlushResults(
(unsigned long)m->file_number_);
// this can release and reacquire the mutex.
s = vset->LogAndApply(&m->edit_, mu);
s = vset->LogAndApply(&m->edit_, mu, db_directory);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable

@ -95,7 +95,8 @@ class MemTableList {
port::Mutex* mu, Logger* info_log,
uint64_t file_number,
std::set<uint64_t>& pending_outputs,
std::vector<MemTable*>* to_delete);
std::vector<MemTable*>* to_delete,
Directory* db_directory);
// New memtables are inserted at the front of the list.
// Takes ownership of the referenced held on *m by the caller of Add().

@ -1419,6 +1419,7 @@ void VersionSet::AppendVersion(Version* v) {
}
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
Directory* db_directory,
bool new_descriptor_log) {
mu->AssertHeld();
@ -1541,6 +1542,9 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
// of it later
env_->DeleteFile(DescriptorFileName(dbname_, old_manifest_file_number));
}
if (!options_->disableDataSync && db_directory != nullptr) {
db_directory->Fsync();
}
}
// find offset in manifest file where this version is stored.
@ -1821,7 +1825,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
VersionEdit ve;
port::Mutex dummy_mutex;
MutexLock l(&dummy_mutex);
return versions.LogAndApply(&ve, &dummy_mutex, true);
return versions.LogAndApply(&ve, &dummy_mutex, nullptr, true);
}
Status VersionSet::DumpManifest(Options& options, std::string& dscname,

@ -284,6 +284,7 @@ class VersionSet {
// REQUIRES: *mu is held on entry.
// REQUIRES: no other thread concurrently calls LogAndApply()
Status LogAndApply(VersionEdit* edit, port::Mutex* mu,
Directory* db_directory = nullptr,
bool new_descriptor_log = false);
// Recover the last saved descriptor from persistent storage.

@ -70,6 +70,9 @@ class HdfsEnv : public Env {
unique_ptr<RandomRWFile>* result,
const EnvOptions& options);
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result);
virtual bool FileExists(const std::string& fname);
virtual Status GetChildren(const std::string& path,
@ -246,6 +249,11 @@ class HdfsEnv : public Env {
return notsup;
}
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) {
return notsup;
}
virtual bool FileExists(const std::string& fname){return false;}
virtual Status GetChildren(const std::string& path,

@ -221,6 +221,11 @@ class WritableFileImpl : public WritableFile {
FileState* file_;
};
class InMemoryDirectory : public Directory {
public:
virtual Status Fsync() { return Status::OK(); }
};
class InMemoryEnv : public EnvWrapper {
public:
explicit InMemoryEnv(Env* base_env) : EnvWrapper(base_env) { }
@ -274,6 +279,12 @@ class InMemoryEnv : public EnvWrapper {
return Status::OK();
}
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) {
result->reset(new InMemoryDirectory());
return Status::OK();
}
virtual bool FileExists(const std::string& fname) {
MutexLock lock(&mutex_);
return file_map_.find(fname) != file_map_.end();

@ -33,6 +33,7 @@ class SequentialFile;
class Slice;
class WritableFile;
class RandomRWFile;
class Directory;
struct Options;
using std::unique_ptr;
@ -122,6 +123,16 @@ class Env {
unique_ptr<RandomRWFile>* result,
const EnvOptions& options) = 0;
// Create an object that represents a directory. Will fail if directory
// doesn't exist. If the directory exists, it will open the directory
// and create a new Directory object.
//
// On success, stores a pointer to the new Directory in
// *result and returns OK. On failure stores nullptr in *result and
// returns non-OK.
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) = 0;
// Returns true iff the named file exists.
virtual bool FileExists(const std::string& fname) = 0;
@ -488,6 +499,15 @@ class RandomRWFile {
void operator=(const RandomRWFile&);
};
// Directory object represents collection of files and implements
// filesystem operations that can be executed on directories.
class Directory {
public:
virtual ~Directory() {}
// Fsync directory
virtual Status Fsync() = 0;
};
// An interface for writing log messages.
class Logger {
public:
@ -578,6 +598,10 @@ class EnvWrapper : public Env {
const EnvOptions& options) {
return target_->NewRandomRWFile(f, r, options);
}
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) {
return target_->NewDirectory(name, result);
}
bool FileExists(const std::string& f) { return target_->FileExists(f); }
Status GetChildren(const std::string& dir, std::vector<std::string>* r) {
return target_->GetChildren(dir, r);

@ -366,6 +366,11 @@ Status HdfsEnv::NewRandomRWFile(const std::string& fname,
return Status::NotSupported("NewRandomRWFile not supported on HdfsEnv");
}
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) {
return Status::NotSupported("NewDirectory not yet supported on HdfsEnv");
}
bool HdfsEnv::FileExists(const std::string& fname) {
int value = hdfsExists(fileSys_, fname.c_str());
if (value == 0) {

@ -867,6 +867,24 @@ class PosixRandomRWFile : public RandomRWFile {
#endif
};
class PosixDirectory : public Directory {
public:
explicit PosixDirectory(int fd) : fd_(fd) {}
~PosixDirectory() {
close(fd_);
}
virtual Status Fsync() {
if (fsync(fd_) == -1) {
return IOError("directory", errno);
}
return Status::OK();
}
private:
int fd_;
};
static int LockOrUnlock(const std::string& fname, int fd, bool lock) {
mutex_lockedFiles.Lock();
if (lock) {
@ -1038,6 +1056,18 @@ class PosixEnv : public Env {
return s;
}
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) {
result->reset();
const int fd = open(name.c_str(), 0);
if (fd < 0) {
return IOError(name, errno);
} else {
result->reset(new PosixDirectory(fd));
}
return Status::OK();
}
virtual bool FileExists(const std::string& fname) {
return access(fname.c_str(), F_OK) == 0;
}

Loading…
Cancel
Save