Sync WAL Directory and DB Path if different from DB directory

Summary:
1. If WAL directory is different from db directory. Sync the directory after creating a log file under it.
2. After creating an SST file, sync its parent directory instead of DB directory.
3. change the check of kResetDeleteUnsyncedFiles in fault_injection_test. Since we changed the behavior to sync log files' parent directory after first WAL sync, instead of creating, kResetDeleteUnsyncedFiles will not guarantee to show post sync updates.

Test Plan: make all check

Reviewers: yhchiang, rven, igor

Reviewed By: igor

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D32067
main
sdong 10 years ago
parent 58f34edfce
commit d888c95748
  1. 1
      Makefile
  2. 9
      db/compaction_job.cc
  3. 7
      db/compaction_job.h
  4. 9
      db/compaction_job_test.cc
  5. 124
      db/db_impl.cc
  6. 30
      db/db_impl.h
  7. 15
      db/fault_injection_test.cc
  8. 10
      db/flush_job.cc
  9. 4
      db/flush_job.h
  10. 4
      db/flush_job_test.cc

@ -108,6 +108,7 @@ BENCHHARNESS = ./util/benchharness.o
VALGRIND_ERROR = 2 VALGRIND_ERROR = 2
VALGRIND_DIR = build_tools/VALGRIND_LOGS VALGRIND_DIR = build_tools/VALGRIND_LOGS
VALGRIND_VER := $(join $(VALGRIND_VER),valgrind) VALGRIND_VER := $(join $(VALGRIND_VER),valgrind)
VALGRIND_OPTS = --error-exitcode=$(VALGRIND_ERROR) --leak-check=full VALGRIND_OPTS = --error-exitcode=$(VALGRIND_ERROR) --leak-check=full
TESTS = \ TESTS = \

@ -205,8 +205,8 @@ CompactionJob::CompactionJob(
Compaction* compaction, const DBOptions& db_options, Compaction* compaction, const DBOptions& db_options,
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
VersionSet* versions, std::atomic<bool>* shutting_down, VersionSet* versions, std::atomic<bool>* shutting_down,
LogBuffer* log_buffer, Directory* db_directory, Statistics* stats, LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory,
SnapshotList* snapshots, bool is_snapshot_supported, Statistics* stats, SnapshotList* snapshots, bool is_snapshot_supported,
std::shared_ptr<Cache> table_cache, std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback) std::function<uint64_t()> yield_callback)
: compact_(new CompactionState(compaction)), : compact_(new CompactionState(compaction)),
@ -219,6 +219,7 @@ CompactionJob::CompactionJob(
shutting_down_(shutting_down), shutting_down_(shutting_down),
log_buffer_(log_buffer), log_buffer_(log_buffer),
db_directory_(db_directory), db_directory_(db_directory),
output_directory_(output_directory),
stats_(stats), stats_(stats),
snapshots_(snapshots), snapshots_(snapshots),
is_snapshot_supported_(is_snapshot_supported), is_snapshot_supported_(is_snapshot_supported),
@ -422,8 +423,8 @@ Status CompactionJob::Run() {
} }
input.reset(); input.reset();
if (db_directory_ && !db_options_.disableDataSync) { if (output_directory_ && !db_options_.disableDataSync) {
db_directory_->Fsync(); output_directory_->Fsync();
} }
compaction_stats_.micros = env_->NowMicros() - start_micros - imm_micros; compaction_stats_.micros = env_->NowMicros() - start_micros - imm_micros;

@ -57,9 +57,9 @@ class CompactionJob {
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options,
const EnvOptions& env_options, VersionSet* versions, const EnvOptions& env_options, VersionSet* versions,
std::atomic<bool>* shutting_down, LogBuffer* log_buffer, std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
Directory* db_directory, Statistics* stats, Directory* db_directory, Directory* output_directory,
SnapshotList* snapshot_list, bool is_snapshot_supported, Statistics* stats, SnapshotList* snapshot_list,
std::shared_ptr<Cache> table_cache, bool is_snapshot_supported, std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback); std::function<uint64_t()> yield_callback);
~CompactionJob() { assert(compact_ == nullptr); } ~CompactionJob() { assert(compact_ == nullptr); }
@ -114,6 +114,7 @@ class CompactionJob {
std::atomic<bool>* shutting_down_; std::atomic<bool>* shutting_down_;
LogBuffer* log_buffer_; LogBuffer* log_buffer_;
Directory* db_directory_; Directory* db_directory_;
Directory* output_directory_;
Statistics* stats_; Statistics* stats_;
SnapshotList* snapshots_; SnapshotList* snapshots_;
bool is_snapshot_supported_; bool is_snapshot_supported_;

@ -160,10 +160,11 @@ TEST(CompactionJobTest, Simple) {
}; };
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
mutex_.Lock(); mutex_.Lock();
CompactionJob compaction_job( CompactionJob compaction_job(compaction.get(), db_options_,
compaction.get(), db_options_, *cfd->GetLatestMutableCFOptions(), *cfd->GetLatestMutableCFOptions(), env_options_,
env_options_, versions_.get(), &shutting_down_, &log_buffer, nullptr, versions_.get(), &shutting_down_, &log_buffer,
nullptr, &snapshots, true, table_cache_, std::move(yield_callback)); nullptr, nullptr, nullptr, &snapshots, true,
table_cache_, std::move(yield_callback));
compaction_job.Prepare(); compaction_job.Prepare();
mutex_.Unlock(); mutex_.Unlock();
ASSERT_OK(compaction_job.Run()); ASSERT_OK(compaction_job.Run());

@ -201,6 +201,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
shutting_down_(false), shutting_down_(false),
bg_cv_(&mutex_), bg_cv_(&mutex_),
logfile_number_(0), logfile_number_(0),
log_dir_unsynced_(true),
log_empty_(true), log_empty_(true),
default_cf_handle_(nullptr), default_cf_handle_(nullptr),
total_log_size_(0), total_log_size_(0),
@ -354,7 +355,7 @@ Status DBImpl::NewDB() {
} }
if (s.ok()) { if (s.ok()) {
// Make "CURRENT" file that points to the new manifest file. // Make "CURRENT" file that points to the new manifest file.
s = SetCurrentFile(env_, dbname_, 1, db_directory_.get()); s = SetCurrentFile(env_, dbname_, 1, directories_.GetDbDir());
} else { } else {
env_->DeleteFile(manifest); env_->DeleteFile(manifest);
} }
@ -701,14 +702,9 @@ void DBImpl::DeleteObsoleteFiles() {
job_context.Clean(); job_context.Clean();
} }
Status DBImpl::Recover( Status DBImpl::Directories::CreateAndNewDirectory(
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only, Env* env, const std::string& dirname,
bool error_if_log_file_exist) { std::unique_ptr<Directory>* directory) const {
mutex_.AssertHeld();
bool is_new_db = false;
assert(db_lock_ == nullptr);
if (!read_only) {
// We call CreateDirIfMissing() as the directory may already exist (if we // We call CreateDirIfMissing() as the directory may already exist (if we
// are reopening a DB), when this happens we don't want creating the // are reopening a DB), when this happens we don't want creating the
// directory to cause an error. However, we need to check if creating the // directory to cause an error. However, we need to check if creating the
@ -716,19 +712,65 @@ Status DBImpl::Recover(
// file not existing. One real-world example of this occurring is if // file not existing. One real-world example of this occurring is if
// env->CreateDirIfMissing() doesn't create intermediate directories, e.g. // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
// when dbname_ is "dir/db" but when "dir" doesn't exist. // when dbname_ is "dir/db" but when "dir" doesn't exist.
Status s = env_->CreateDirIfMissing(dbname_); Status s = env->CreateDirIfMissing(dirname);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
return env->NewDirectory(dirname, directory);
}
for (auto& db_path : db_options_.db_paths) { Status DBImpl::Directories::SetDirectories(
s = env_->CreateDirIfMissing(db_path.path); Env* env, const std::string& dbname, const std::string& wal_dir,
const std::vector<DbPath>& data_paths) {
Status s = CreateAndNewDirectory(env, dbname, &db_dir_);
if (!s.ok()) {
return s;
}
if (!wal_dir.empty() && dbname != wal_dir) {
s = CreateAndNewDirectory(env, wal_dir, &wal_dir_);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
} }
s = env_->NewDirectory(dbname_, &db_directory_); data_dirs_.clear();
for (auto& p : data_paths) {
const std::string db_path = p.path;
if (db_path == dbname) {
data_dirs_.emplace_back(nullptr);
} else {
std::unique_ptr<Directory> path_directory;
s = CreateAndNewDirectory(env, db_path, &path_directory);
if (!s.ok()) {
return s;
}
data_dirs_.emplace_back(path_directory.release());
}
}
assert(data_dirs_.size() == data_paths.size());
return Status::OK();
}
Directory* DBImpl::Directories::GetDataDir(size_t path_id) {
assert(path_id < data_dirs_.size());
Directory* ret_dir = data_dirs_[path_id].get();
if (ret_dir == nullptr) {
// Should use db_dir_
return db_dir_.get();
}
return ret_dir;
}
Status DBImpl::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
bool error_if_log_file_exist) {
mutex_.AssertHeld();
bool is_new_db = false;
assert(db_lock_ == nullptr);
if (!read_only) {
Status s = directories_.SetDirectories(env_, dbname_, db_options_.wal_dir,
db_options_.db_paths);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -1088,8 +1130,8 @@ Status DBImpl::FlushMemTableToOutputFile(
FlushJob flush_job(dbname_, cfd, db_options_, mutable_cf_options, FlushJob flush_job(dbname_, cfd, db_options_, mutable_cf_options,
env_options_, versions_.get(), &mutex_, &shutting_down_, env_options_, versions_.get(), &mutex_, &shutting_down_,
snapshots_.GetNewest(), job_context, log_buffer, snapshots_.GetNewest(), job_context, log_buffer,
db_directory_.get(), GetCompressionFlush(*cfd->ioptions()), directories_.GetDbDir(), directories_.GetDataDir(0U),
stats_); GetCompressionFlush(*cfd->ioptions()), stats_);
uint64_t file_number; uint64_t file_number;
Status s = flush_job.Run(&file_number); Status s = flush_job.Run(&file_number);
@ -1331,11 +1373,11 @@ Status DBImpl::CompactFilesImpl(
*c->mutable_cf_options(), &job_context, *c->mutable_cf_options(), &job_context,
&log_buffer); &log_buffer);
}; };
CompactionJob compaction_job(c.get(), db_options_, *c->mutable_cf_options(), CompactionJob compaction_job(
env_options_, versions_.get(), &shutting_down_, c.get(), db_options_, *c->mutable_cf_options(), env_options_,
&log_buffer, db_directory_.get(), stats_, versions_.get(), &shutting_down_, &log_buffer, directories_.GetDbDir(),
&snapshots_, is_snapshot_supported_, directories_.GetDataDir(c->GetOutputPathId()), stats_, &snapshots_,
table_cache_, std::move(yield_callback)); is_snapshot_supported_, table_cache_, std::move(yield_callback));
compaction_job.Prepare(); compaction_job.Prepare();
mutex_.Unlock(); mutex_.Unlock();
@ -1510,8 +1552,8 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
"[%s] Apply version edit:\n%s", "[%s] Apply version edit:\n%s",
cfd->GetName().c_str(), edit.DebugString().data()); cfd->GetName().c_str(), edit.DebugString().data());
status = versions_->LogAndApply(cfd, status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
mutable_cf_options, &edit, &mutex_, db_directory_.get()); directories_.GetDbDir());
superversion_to_free = InstallSuperVersion( superversion_to_free = InstallSuperVersion(
cfd, new_superversion, mutable_cf_options); cfd, new_superversion, mutable_cf_options);
new_superversion = nullptr; new_superversion = nullptr;
@ -2136,9 +2178,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
for (const auto& f : *c->inputs(0)) { for (const auto& f : *c->inputs(0)) {
c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
} }
status = versions_->LogAndApply( status = versions_->LogAndApply(c->column_family_data(),
c->column_family_data(), *c->mutable_cf_options(), c->edit(), *c->mutable_cf_options(), c->edit(),
&mutex_, db_directory_.get()); &mutex_, directories_.GetDbDir());
InstallSuperVersionBackground(c->column_family_data(), job_context, InstallSuperVersionBackground(c->column_family_data(), job_context,
*c->mutable_cf_options()); *c->mutable_cf_options());
LogToBuffer(log_buffer, "[%s] Deleted %d files\n", LogToBuffer(log_buffer, "[%s] Deleted %d files\n",
@ -2164,8 +2206,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
f->fd.GetFileSize(), f->smallest, f->largest, f->fd.GetFileSize(), f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno); f->smallest_seqno, f->largest_seqno);
status = versions_->LogAndApply(c->column_family_data(), status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), *c->mutable_cf_options(), c->edit(),
c->edit(), &mutex_, db_directory_.get()); &mutex_, directories_.GetDbDir());
// Use latest MutableCFOptions // Use latest MutableCFOptions
InstallSuperVersionBackground(c->column_family_data(), job_context, InstallSuperVersionBackground(c->column_family_data(), job_context,
*c->mutable_cf_options()); *c->mutable_cf_options());
@ -2190,11 +2232,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
*c->mutable_cf_options(), job_context, *c->mutable_cf_options(), job_context,
log_buffer); log_buffer);
}; };
CompactionJob compaction_job(c.get(), db_options_, *c->mutable_cf_options(), CompactionJob compaction_job(
env_options_, versions_.get(), &shutting_down_, c.get(), db_options_, *c->mutable_cf_options(), env_options_,
log_buffer, db_directory_.get(), stats_, versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
&snapshots_, is_snapshot_supported_, directories_.GetDataDir(c->GetOutputPathId()), stats_, &snapshots_,
table_cache_, std::move(yield_callback)); is_snapshot_supported_, table_cache_, std::move(yield_callback));
compaction_job.Prepare(); compaction_job.Prepare();
mutex_.Unlock(); mutex_.Unlock();
status = compaction_job.Run(); status = compaction_job.Run();
@ -2600,7 +2642,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
// ColumnFamilyData object // ColumnFamilyData object
s = versions_->LogAndApply( s = versions_->LogAndApply(
nullptr, MutableCFOptions(opt, ImmutableCFOptions(opt)), &edit, nullptr, MutableCFOptions(opt, ImmutableCFOptions(opt)), &edit,
&mutex_, db_directory_.get(), false, &cf_options); &mutex_, directories_.GetDbDir(), false, &cf_options);
write_thread_.ExitWriteThread(&w, &w, s); write_thread_.ExitWriteThread(&w, &w, s);
} }
if (s.ok()) { if (s.ok()) {
@ -3059,6 +3101,13 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
} else { } else {
status = log_->file()->Sync(); status = log_->file()->Sync();
} }
if (status.ok() && log_dir_unsynced_) {
// We only sync WAL directory the first time WAL syncing is
// requested, so that in case users never turn on WAL sync,
// we can avoid the disk I/O in the write code path.
status = directories_.GetWalDir()->Fsync();
}
log_dir_unsynced_ = false;
} }
} }
if (status.ok()) { if (status.ok()) {
@ -3193,14 +3242,15 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
{ {
if (creating_new_log) { if (creating_new_log) {
s = env_->NewWritableFile( s = env_->NewWritableFile(
LogFileName(db_options_.wal_dir, new_log_number), LogFileName(db_options_.wal_dir, new_log_number), &lfile,
&lfile, env_->OptimizeForLogWrite(env_options_)); env_->OptimizeForLogWrite(env_options_));
if (s.ok()) { if (s.ok()) {
// Our final size should be less than write_buffer_size // Our final size should be less than write_buffer_size
// (compression, etc) but err on the side of caution. // (compression, etc) but err on the side of caution.
lfile->SetPreallocationBlockSize( lfile->SetPreallocationBlockSize(
1.1 * mutable_cf_options.write_buffer_size); 1.1 * mutable_cf_options.write_buffer_size);
new_log = new log::Writer(std::move(lfile)); new_log = new log::Writer(std::move(lfile));
log_dir_unsynced_ = true;
} }
} }
@ -3497,7 +3547,7 @@ Status DBImpl::DeleteFile(std::string name) {
edit.SetColumnFamily(cfd->GetID()); edit.SetColumnFamily(cfd->GetID());
edit.DeleteFile(level, number); edit.DeleteFile(level, number);
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_, db_directory_.get()); &edit, &mutex_, directories_.GetDbDir());
if (status.ok()) { if (status.ok()) {
InstallSuperVersionBackground(cfd, &job_context, InstallSuperVersionBackground(cfd, &job_context,
*cfd->GetLatestMutableCFOptions()); *cfd->GetLatestMutableCFOptions());
@ -3745,7 +3795,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
impl->alive_log_files_.push_back( impl->alive_log_files_.push_back(
DBImpl::LogFileNumberSize(impl->logfile_number_)); DBImpl::LogFileNumberSize(impl->logfile_number_));
impl->DeleteObsoleteFiles(); impl->DeleteObsoleteFiles();
s = impl->db_directory_->Fsync(); s = impl->directories_.GetDbDir()->Fsync();
} }
} }

@ -422,6 +422,7 @@ class DBImpl : public DB {
port::CondVar bg_cv_; port::CondVar bg_cv_;
uint64_t logfile_number_; uint64_t logfile_number_;
unique_ptr<log::Writer> log_; unique_ptr<log::Writer> log_;
bool log_dir_unsynced_;
bool log_empty_; bool log_empty_;
ColumnFamilyHandleImpl* default_cf_handle_; ColumnFamilyHandleImpl* default_cf_handle_;
InternalStats* default_cf_internal_stats_; InternalStats* default_cf_internal_stats_;
@ -445,7 +446,34 @@ class DBImpl : public DB {
bool is_snapshot_supported_; bool is_snapshot_supported_;
std::unique_ptr<Directory> db_directory_; // Class to maintain directories for all database paths other than main one.
class Directories {
public:
Status SetDirectories(Env* env, const std::string& dbname,
const std::string& wal_dir,
const std::vector<DbPath>& data_paths);
Directory* GetDataDir(size_t path_id);
Directory* GetWalDir() {
if (wal_dir_) {
return wal_dir_.get();
}
return db_dir_.get();
}
Directory* GetDbDir() { return db_dir_.get(); }
private:
std::unique_ptr<Directory> db_dir_;
std::vector<std::unique_ptr<Directory>> data_dirs_;
std::unique_ptr<Directory> wal_dir_;
Status CreateAndNewDirectory(Env* env, const std::string& dirname,
std::unique_ptr<Directory>* directory) const;
};
Directories directories_;
WriteBuffer write_buffer_; WriteBuffer write_buffer_;

@ -272,6 +272,7 @@ class FaultInjectionTestEnv : public EnvWrapper {
} }
void SyncDir(const std::string& dirname) { void SyncDir(const std::string& dirname) {
MutexLock l(&mutex_);
dir_to_new_files_since_last_sync_.erase(dirname); dir_to_new_files_since_last_sync_.erase(dirname);
} }
@ -630,32 +631,22 @@ TEST(FaultInjectionTest, FaultTest) {
NoWriteTestPreFault(); NoWriteTestPreFault();
NoWriteTestReopenWithFault(kResetDropUnsyncedData); NoWriteTestReopenWithFault(kResetDropUnsyncedData);
// TODO(t6070540) Need to sync WAL Dir and other DB paths too.
// Setting a separate data path won't pass the test as we don't sync // Setting a separate data path won't pass the test as we don't sync
// it after creating new files, // it after creating new files,
if (option_config_ != kDifferentDataDir) {
PartialCompactTestPreFault(num_pre_sync, num_post_sync); PartialCompactTestPreFault(num_pre_sync, num_post_sync);
// Since we don't sync WAL Dir, this test dosn't pass.
if (option_config_ != kWalDirSyncWal) {
PartialCompactTestReopenWithFault(kResetDropAndDeleteUnsynced, PartialCompactTestReopenWithFault(kResetDropAndDeleteUnsynced,
num_pre_sync, num_post_sync); num_pre_sync, num_post_sync);
}
NoWriteTestPreFault(); NoWriteTestPreFault();
NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced); NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
PartialCompactTestPreFault(num_pre_sync, num_post_sync); PartialCompactTestPreFault(num_pre_sync, num_post_sync);
// No new files created so we expect all values since no files will be // No new files created so we expect all values since no files will be
// dropped. // dropped.
// WAL Dir is not synced for now. PartialCompactTestReopenWithFault(kResetDeleteUnsyncedFiles, num_pre_sync,
if (option_config_ != kWalDir && option_config_ != kWalDirSyncWal) { num_post_sync);
PartialCompactTestReopenWithFault(kResetDeleteUnsyncedFiles,
num_pre_sync + num_post_sync, 0);
}
NoWriteTestPreFault(); NoWriteTestPreFault();
NoWriteTestReopenWithFault(kResetDeleteUnsyncedFiles); NoWriteTestReopenWithFault(kResetDeleteUnsyncedFiles);
} }
}
} while (ChangeOptions()); } while (ChangeOptions());
} }

@ -58,6 +58,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
port::Mutex* db_mutex, std::atomic<bool>* shutting_down, port::Mutex* db_mutex, std::atomic<bool>* shutting_down,
SequenceNumber newest_snapshot, JobContext* job_context, SequenceNumber newest_snapshot, JobContext* job_context,
LogBuffer* log_buffer, Directory* db_directory, LogBuffer* log_buffer, Directory* db_directory,
Directory* output_file_directory,
CompressionType output_compression, Statistics* stats) CompressionType output_compression, Statistics* stats)
: dbname_(dbname), : dbname_(dbname),
cfd_(cfd), cfd_(cfd),
@ -71,6 +72,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
job_context_(job_context), job_context_(job_context),
log_buffer_(log_buffer), log_buffer_(log_buffer),
db_directory_(db_directory), db_directory_(db_directory),
output_file_directory_(output_file_directory),
output_compression_(output_compression), output_compression_(output_compression),
stats_(stats) {} stats_(stats) {}
@ -125,10 +127,9 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
db_mutex_->AssertHeld(); db_mutex_->AssertHeld();
const uint64_t start_micros = db_options_.env->NowMicros(); const uint64_t start_micros = db_options_.env->NowMicros();
FileMetaData meta; FileMetaData meta;
// path 0 for level 0 file.
meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
*filenumber = meta.fd.GetNumber(); *filenumber = meta.fd.GetNumber();
// path 0 for level 0 file.
const SequenceNumber earliest_seqno_in_memtable = const SequenceNumber earliest_seqno_in_memtable =
mems[0]->GetFirstSequenceNumber(); mems[0]->GetFirstSequenceNumber();
@ -169,9 +170,8 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
"[%s] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s", "[%s] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s",
cfd_->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(), cfd_->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(),
s.ToString().c_str()); s.ToString().c_str());
if (!db_options_.disableDataSync && output_file_directory_ != nullptr) {
if (!db_options_.disableDataSync && db_directory_ != nullptr) { output_file_directory_->Fsync();
db_directory_->Fsync();
} }
db_mutex_->Lock(); db_mutex_->Lock();
} }

@ -57,7 +57,8 @@ class FlushJob {
port::Mutex* db_mutex, std::atomic<bool>* shutting_down, port::Mutex* db_mutex, std::atomic<bool>* shutting_down,
SequenceNumber newest_snapshot, JobContext* job_context, SequenceNumber newest_snapshot, JobContext* job_context,
LogBuffer* log_buffer, Directory* db_directory, LogBuffer* log_buffer, Directory* db_directory,
CompressionType output_compression, Statistics* stats); Directory* output_file_directory, CompressionType output_compression,
Statistics* stats);
~FlushJob() {} ~FlushJob() {}
Status Run(uint64_t* file_number = nullptr); Status Run(uint64_t* file_number = nullptr);
@ -77,6 +78,7 @@ class FlushJob {
JobContext* job_context_; JobContext* job_context_;
LogBuffer* log_buffer_; LogBuffer* log_buffer_;
Directory* db_directory_; Directory* db_directory_;
Directory* output_file_directory_;
CompressionType output_compression_; CompressionType output_compression_;
Statistics* stats_; Statistics* stats_;
}; };

@ -86,7 +86,7 @@ TEST(FlushJobTest, Empty) {
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(), db_options_, *cfd->GetLatestMutableCFOptions(),
env_options_, versions_.get(), &mutex_, &shutting_down_, env_options_, versions_.get(), &mutex_, &shutting_down_,
SequenceNumber(), &job_context, nullptr, nullptr, SequenceNumber(), &job_context, nullptr, nullptr, nullptr,
kNoCompression, nullptr); kNoCompression, nullptr);
ASSERT_OK(flush_job.Run()); ASSERT_OK(flush_job.Run());
job_context.Clean(); job_context.Clean();
@ -110,7 +110,7 @@ TEST(FlushJobTest, NonEmpty) {
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(), db_options_, *cfd->GetLatestMutableCFOptions(),
env_options_, versions_.get(), &mutex_, &shutting_down_, env_options_, versions_.get(), &mutex_, &shutting_down_,
SequenceNumber(), &job_context, nullptr, nullptr, SequenceNumber(), &job_context, nullptr, nullptr, nullptr,
kNoCompression, nullptr); kNoCompression, nullptr);
mutex_.Lock(); mutex_.Lock();
ASSERT_OK(flush_job.Run()); ASSERT_OK(flush_job.Run());

Loading…
Cancel
Save