Replace Directory with FSDirectory in DB (#6468)

Summary:
In the current code base, we can use Directory from Env to manage directory (e.g, Fsync()). The PR https://github.com/facebook/rocksdb/issues/5761  introduce the File System as a new Env API. So we further replace the Directory class in DB with FSDirectory such that we can have more IO information from IOStatus returned by FSDirectory.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6468

Test Plan: pass make asan_check

Differential Revision: D20195261

Pulled By: zhichao-cao

fbshipit-source-id: 93962cb9436852bfcfb76e086d9e7babd461cbe1
main
Zhichao Cao 5 years ago committed by Facebook Github Bot
parent 904a60ff63
commit 8d73137ae8
  1. 8
      db/column_family.cc
  2. 6
      db/column_family.h
  3. 4
      db/compaction/compaction_job.cc
  4. 8
      db/compaction/compaction_job.h
  5. 8
      db/db_impl/db_impl.cc
  6. 27
      db/db_impl/db_impl.h
  7. 8
      db/db_impl/db_impl_compaction_flush.cc
  8. 47
      db/db_impl/db_impl_open.cc
  9. 2
      db/db_impl/db_impl_write.cc
  10. 2
      db/external_sst_file_ingestion_job.cc
  11. 6
      db/flush_job.cc
  12. 11
      db/flush_job.h
  13. 4
      db/memtable_list.cc
  14. 8
      db/memtable_list.h
  15. 4
      db/version_set.cc
  16. 10
      db/version_set.h
  17. 4
      file/filename.cc
  18. 3
      file/filename.h

@ -1321,7 +1321,7 @@ Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
} }
Status ColumnFamilyData::AddDirectories( Status ColumnFamilyData::AddDirectories(
std::map<std::string, std::shared_ptr<Directory>>* created_dirs) { std::map<std::string, std::shared_ptr<FSDirectory>>* created_dirs) {
Status s; Status s;
assert(created_dirs != nullptr); assert(created_dirs != nullptr);
assert(data_dirs_.empty()); assert(data_dirs_.empty());
@ -1329,8 +1329,8 @@ Status ColumnFamilyData::AddDirectories(
auto existing_dir = created_dirs->find(p.path); auto existing_dir = created_dirs->find(p.path);
if (existing_dir == created_dirs->end()) { if (existing_dir == created_dirs->end()) {
std::unique_ptr<Directory> path_directory; std::unique_ptr<FSDirectory> path_directory;
s = DBImpl::CreateAndNewDirectory(ioptions_.env, p.path, &path_directory); s = DBImpl::CreateAndNewDirectory(ioptions_.fs, p.path, &path_directory);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -1345,7 +1345,7 @@ Status ColumnFamilyData::AddDirectories(
return s; return s;
} }
Directory* ColumnFamilyData::GetDataDir(size_t path_id) const { FSDirectory* ColumnFamilyData::GetDataDir(size_t path_id) const {
if (data_dirs_.empty()) { if (data_dirs_.empty()) {
return nullptr; return nullptr;
} }

@ -500,9 +500,9 @@ class ColumnFamilyData {
// created_dirs remembers directory created, so that we don't need to call // created_dirs remembers directory created, so that we don't need to call
// the same data creation operation again. // the same data creation operation again.
Status AddDirectories( Status AddDirectories(
std::map<std::string, std::shared_ptr<Directory>>* created_dirs); std::map<std::string, std::shared_ptr<FSDirectory>>* created_dirs);
Directory* GetDataDir(size_t path_id) const; FSDirectory* GetDataDir(size_t path_id) const;
ThreadLocalPtr* TEST_GetLocalSV() { return local_sv_.get(); } ThreadLocalPtr* TEST_GetLocalSV() { return local_sv_.get(); }
@ -592,7 +592,7 @@ class ColumnFamilyData {
std::atomic<uint64_t> last_memtable_id_; std::atomic<uint64_t> last_memtable_id_;
// Directories corresponding to cf_paths. // Directories corresponding to cf_paths.
std::vector<std::shared_ptr<Directory>> data_dirs_; std::vector<std::shared_ptr<FSDirectory>> data_dirs_;
}; };
// ColumnFamilySet has interesting thread-safety requirements // ColumnFamilySet has interesting thread-safety requirements

@ -298,7 +298,7 @@ CompactionJob::CompactionJob(
const FileOptions& file_options, VersionSet* versions, const FileOptions& file_options, VersionSet* versions,
const std::atomic<bool>* shutting_down, const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer, const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory, Statistics* stats, FSDirectory* db_directory, FSDirectory* output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots, std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot, SequenceNumber earliest_write_conflict_snapshot,
@ -614,7 +614,7 @@ Status CompactionJob::Run() {
} }
if (status.ok() && output_directory_) { if (status.ok() && output_directory_) {
status = output_directory_->Fsync(); status = output_directory_->Fsync(IOOptions(), nullptr);
} }
if (status.ok()) { if (status.ok()) {

@ -67,8 +67,8 @@ class CompactionJob {
const FileOptions& file_options, VersionSet* versions, const FileOptions& file_options, VersionSet* versions,
const std::atomic<bool>* shutting_down, const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum, const SequenceNumber preserve_deletes_seqnum,
LogBuffer* log_buffer, Directory* db_directory, LogBuffer* log_buffer, FSDirectory* db_directory,
Directory* output_directory, Statistics* stats, FSDirectory* output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots, std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot, SequenceNumber earliest_write_conflict_snapshot,
@ -161,8 +161,8 @@ class CompactionJob {
const std::atomic<bool>* manual_compaction_paused_; const std::atomic<bool>* manual_compaction_paused_;
const SequenceNumber preserve_deletes_seqnum_; const SequenceNumber preserve_deletes_seqnum_;
LogBuffer* log_buffer_; LogBuffer* log_buffer_;
Directory* db_directory_; FSDirectory* db_directory_;
Directory* output_directory_; FSDirectory* output_directory_;
Statistics* stats_; Statistics* stats_;
InstrumentedMutex* db_mutex_; InstrumentedMutex* db_mutex_;
ErrorHandler* db_error_handler_; ErrorHandler* db_error_handler_;

@ -890,9 +890,9 @@ void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
} }
} }
Directory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const { FSDirectory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const {
assert(cfd); assert(cfd);
Directory* ret_dir = cfd->GetDataDir(path_id); FSDirectory* ret_dir = cfd->GetDataDir(path_id);
if (ret_dir == nullptr) { if (ret_dir == nullptr) {
return directories_.GetDataDir(path_id); return directories_.GetDataDir(path_id);
} }
@ -1224,7 +1224,7 @@ Status DBImpl::SyncWAL() {
} }
} }
if (status.ok() && need_log_dir_sync) { if (status.ok() && need_log_dir_sync) {
status = directories_.GetWalDir()->Fsync(); status = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
} }
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
@ -2316,7 +2316,7 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
auto* cfd = auto* cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
assert(cfd != nullptr); assert(cfd != nullptr);
std::map<std::string, std::shared_ptr<Directory>> dummy_created_dirs; std::map<std::string, std::shared_ptr<FSDirectory>> dummy_created_dirs;
s = cfd->AddDirectories(&dummy_created_dirs); s = cfd->AddDirectories(&dummy_created_dirs);
} }
if (s.ok()) { if (s.ok()) {

@ -82,13 +82,13 @@ struct MemTableInfo;
// Class to maintain directories for all database paths other than main one. // Class to maintain directories for all database paths other than main one.
class Directories { class Directories {
public: public:
Status SetDirectories(Env* env, const std::string& dbname, IOStatus SetDirectories(FileSystem* fs, const std::string& dbname,
const std::string& wal_dir, const std::string& wal_dir,
const std::vector<DbPath>& data_paths); const std::vector<DbPath>& data_paths);
Directory* GetDataDir(size_t path_id) const { FSDirectory* GetDataDir(size_t path_id) const {
assert(path_id < data_dirs_.size()); assert(path_id < data_dirs_.size());
Directory* ret_dir = data_dirs_[path_id].get(); FSDirectory* ret_dir = data_dirs_[path_id].get();
if (ret_dir == nullptr) { if (ret_dir == nullptr) {
// Should use db_dir_ // Should use db_dir_
return db_dir_.get(); return db_dir_.get();
@ -96,19 +96,19 @@ class Directories {
return ret_dir; return ret_dir;
} }
Directory* GetWalDir() { FSDirectory* GetWalDir() {
if (wal_dir_) { if (wal_dir_) {
return wal_dir_.get(); return wal_dir_.get();
} }
return db_dir_.get(); return db_dir_.get();
} }
Directory* GetDbDir() { return db_dir_.get(); } FSDirectory* GetDbDir() { return db_dir_.get(); }
private: private:
std::unique_ptr<Directory> db_dir_; std::unique_ptr<FSDirectory> db_dir_;
std::vector<std::unique_ptr<Directory>> data_dirs_; std::vector<std::unique_ptr<FSDirectory>> data_dirs_;
std::unique_ptr<Directory> wal_dir_; std::unique_ptr<FSDirectory> wal_dir_;
}; };
// While DB is the public interface of RocksDB, and DBImpl is the actual // While DB is the public interface of RocksDB, and DBImpl is the actual
@ -830,8 +830,9 @@ class DBImpl : public DB {
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr, std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
const bool seq_per_batch, const bool batch_per_txn); const bool seq_per_batch, const bool batch_per_txn);
static Status CreateAndNewDirectory(Env* env, const std::string& dirname, static IOStatus CreateAndNewDirectory(
std::unique_ptr<Directory>* directory); FileSystem* fs, const std::string& dirname,
std::unique_ptr<FSDirectory>* directory);
// find stats map from stats_history_ with smallest timestamp in // find stats map from stats_history_ with smallest timestamp in
// the range of [start_time, end_time) // the range of [start_time, end_time)
@ -1602,7 +1603,7 @@ class DBImpl : public DB {
uint64_t GetMaxTotalWalSize() const; uint64_t GetMaxTotalWalSize() const;
Directory* GetDataDir(ColumnFamilyData* cfd, size_t path_id) const; FSDirectory* GetDataDir(ColumnFamilyData* cfd, size_t path_id) const;
Status CloseHelper(); Status CloseHelper();

@ -117,7 +117,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
} }
} }
if (s.ok()) { if (s.ok()) {
s = directories_.GetWalDir()->Fsync(); s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
} }
mutex_.Lock(); mutex_.Lock();
@ -301,7 +301,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
GetSnapshotContext(job_context, &snapshot_seqs, GetSnapshotContext(job_context, &snapshot_seqs,
&earliest_write_conflict_snapshot, &snapshot_checker); &earliest_write_conflict_snapshot, &snapshot_checker);
autovector<Directory*> distinct_output_dirs; autovector<FSDirectory*> distinct_output_dirs;
autovector<std::string> distinct_output_dir_paths; autovector<std::string> distinct_output_dir_paths;
std::vector<std::unique_ptr<FlushJob>> jobs; std::vector<std::unique_ptr<FlushJob>> jobs;
std::vector<MutableCFOptions> all_mutable_cf_options; std::vector<MutableCFOptions> all_mutable_cf_options;
@ -309,7 +309,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
all_mutable_cf_options.reserve(num_cfs); all_mutable_cf_options.reserve(num_cfs);
for (int i = 0; i < num_cfs; ++i) { for (int i = 0; i < num_cfs; ++i) {
auto cfd = cfds[i]; auto cfd = cfds[i];
Directory* data_dir = GetDataDir(cfd, 0U); FSDirectory* data_dir = GetDataDir(cfd, 0U);
const std::string& curr_path = cfd->ioptions()->cf_paths[0].path; const std::string& curr_path = cfd->ioptions()->cf_paths[0].path;
// Add to distinct output directories if eligible. Use linear search. Since // Add to distinct output directories if eligible. Use linear search. Since
@ -413,7 +413,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// Sync on all distinct output directories. // Sync on all distinct output directories.
for (auto dir : distinct_output_dirs) { for (auto dir : distinct_output_dirs) {
if (dir != nullptr) { if (dir != nullptr) {
Status error_status = dir->Fsync(); Status error_status = dir->Fsync(IOOptions(), nullptr);
if (!error_status.ok()) { if (!error_status.ok()) {
s = error_status; s = error_status;
break; break;

@ -301,8 +301,9 @@ Status DBImpl::NewDB() {
return s; return s;
} }
Status DBImpl::CreateAndNewDirectory(Env* env, const std::string& dirname, IOStatus DBImpl::CreateAndNewDirectory(
std::unique_ptr<Directory>* directory) { FileSystem* fs, const std::string& dirname,
std::unique_ptr<FSDirectory>* directory) {
// We call CreateDirIfMissing() as the directory may already exist (if we // We call CreateDirIfMissing() as the directory may already exist (if we
// are reopening a DB), when this happens we don't want creating the // are reopening a DB), when this happens we don't want creating the
// directory to cause an error. However, we need to check if creating the // directory to cause an error. However, we need to check if creating the
@ -310,24 +311,24 @@ Status DBImpl::CreateAndNewDirectory(Env* env, const std::string& dirname,
// file not existing. One real-world example of this occurring is if // file not existing. One real-world example of this occurring is if
// env->CreateDirIfMissing() doesn't create intermediate directories, e.g. // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
// when dbname_ is "dir/db" but when "dir" doesn't exist. // when dbname_ is "dir/db" but when "dir" doesn't exist.
Status s = env->CreateDirIfMissing(dirname); IOStatus io_s = fs->CreateDirIfMissing(dirname, IOOptions(), nullptr);
if (!s.ok()) { if (!io_s.ok()) {
return s; return io_s;
} }
return env->NewDirectory(dirname, directory); return fs->NewDirectory(dirname, IOOptions(), directory, nullptr);
} }
Status Directories::SetDirectories(Env* env, const std::string& dbname, IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname,
const std::string& wal_dir, const std::string& wal_dir,
const std::vector<DbPath>& data_paths) { const std::vector<DbPath>& data_paths) {
Status s = DBImpl::CreateAndNewDirectory(env, dbname, &db_dir_); IOStatus io_s = DBImpl::CreateAndNewDirectory(fs, dbname, &db_dir_);
if (!s.ok()) { if (!io_s.ok()) {
return s; return io_s;
} }
if (!wal_dir.empty() && dbname != wal_dir) { if (!wal_dir.empty() && dbname != wal_dir) {
s = DBImpl::CreateAndNewDirectory(env, wal_dir, &wal_dir_); io_s = DBImpl::CreateAndNewDirectory(fs, wal_dir, &wal_dir_);
if (!s.ok()) { if (!io_s.ok()) {
return s; return io_s;
} }
} }
@ -337,16 +338,16 @@ Status Directories::SetDirectories(Env* env, const std::string& dbname,
if (db_path == dbname) { if (db_path == dbname) {
data_dirs_.emplace_back(nullptr); data_dirs_.emplace_back(nullptr);
} else { } else {
std::unique_ptr<Directory> path_directory; std::unique_ptr<FSDirectory> path_directory;
s = DBImpl::CreateAndNewDirectory(env, db_path, &path_directory); io_s = DBImpl::CreateAndNewDirectory(fs, db_path, &path_directory);
if (!s.ok()) { if (!io_s.ok()) {
return s; return io_s;
} }
data_dirs_.emplace_back(path_directory.release()); data_dirs_.emplace_back(path_directory.release());
} }
} }
assert(data_dirs_.size() == data_paths.size()); assert(data_dirs_.size() == data_paths.size());
return Status::OK(); return IOStatus::OK();
} }
Status DBImpl::Recover( Status DBImpl::Recover(
@ -358,7 +359,7 @@ Status DBImpl::Recover(
bool is_new_db = false; bool is_new_db = false;
assert(db_lock_ == nullptr); assert(db_lock_ == nullptr);
if (!read_only) { if (!read_only) {
Status s = directories_.SetDirectories(env_, dbname_, Status s = directories_.SetDirectories(fs_.get(), dbname_,
immutable_db_options_.wal_dir, immutable_db_options_.wal_dir,
immutable_db_options_.db_paths); immutable_db_options_.db_paths);
if (!s.ok()) { if (!s.ok()) {
@ -458,7 +459,7 @@ Status DBImpl::Recover(
s = CheckConsistency(); s = CheckConsistency();
} }
if (s.ok() && !read_only) { if (s.ok() && !read_only) {
std::map<std::string, std::shared_ptr<Directory>> created_dirs; std::map<std::string, std::shared_ptr<FSDirectory>> created_dirs;
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
s = cfd->AddDirectories(&created_dirs); s = cfd->AddDirectories(&created_dirs);
if (!s.ok()) { if (!s.ok()) {
@ -1477,7 +1478,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
} }
impl->DeleteObsoleteFiles(); impl->DeleteObsoleteFiles();
s = impl->directories_.GetDbDir()->Fsync(); s = impl->directories_.GetDbDir()->Fsync(IOOptions(), nullptr);
} }
if (s.ok()) { if (s.ok()) {
// In WritePrepared there could be gap in sequence numbers. This breaks // In WritePrepared there could be gap in sequence numbers. This breaks

@ -1041,7 +1041,7 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
// We only sync WAL directory the first time WAL syncing is // We only sync WAL directory the first time WAL syncing is
// requested, so that in case users never turn on WAL sync, // requested, so that in case users never turn on WAL sync,
// we can avoid the disk I/O in the write code path. // we can avoid the disk I/O in the write code path.
status = directories_.GetWalDir()->Fsync(); status = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
} }
} }

@ -148,7 +148,7 @@ Status ExternalSstFileIngestionJob::Prepare(
TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir"); TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir");
if (status.ok()) { if (status.ok()) {
for (auto path_id : ingestion_path_ids) { for (auto path_id : ingestion_path_ids) {
status = directories_->GetDataDir(path_id)->Fsync(); status = directories_->GetDataDir(path_id)->Fsync(IOOptions(), nullptr);
if (!status.ok()) { if (!status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, ROCKS_LOG_WARN(db_options_.info_log,
"Failed to sync directory %" ROCKSDB_PRIszt "Failed to sync directory %" ROCKSDB_PRIszt

@ -92,8 +92,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
std::vector<SequenceNumber> existing_snapshots, std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot, SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, JobContext* job_context, SnapshotChecker* snapshot_checker, JobContext* job_context,
LogBuffer* log_buffer, Directory* db_directory, LogBuffer* log_buffer, FSDirectory* db_directory,
Directory* output_file_directory, FSDirectory* output_file_directory,
CompressionType output_compression, Statistics* stats, CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats, EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest, const bool sync_output_directory, const bool write_manifest,
@ -397,7 +397,7 @@ Status FlushJob::WriteLevel0Table() {
meta_.marked_for_compaction ? " (needs compaction)" : ""); meta_.marked_for_compaction ? " (needs compaction)" : "");
if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) { if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
s = output_file_directory_->Fsync(); s = output_file_directory_->Fsync(IOOptions(), nullptr);
} }
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_); TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
db_mutex_->Lock(); db_mutex_->Lock();

@ -67,9 +67,10 @@ class FlushJob {
std::vector<SequenceNumber> existing_snapshots, std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot, SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, JobContext* job_context, SnapshotChecker* snapshot_checker, JobContext* job_context,
LogBuffer* log_buffer, Directory* db_directory, LogBuffer* log_buffer, FSDirectory* db_directory,
Directory* output_file_directory, CompressionType output_compression, FSDirectory* output_file_directory,
Statistics* stats, EventLogger* event_logger, bool measure_io_stats, CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest, const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri); Env::Priority thread_pri);
@ -117,8 +118,8 @@ class FlushJob {
SnapshotChecker* snapshot_checker_; SnapshotChecker* snapshot_checker_;
JobContext* job_context_; JobContext* job_context_;
LogBuffer* log_buffer_; LogBuffer* log_buffer_;
Directory* db_directory_; FSDirectory* db_directory_;
Directory* output_file_directory_; FSDirectory* output_file_directory_;
CompressionType output_compression_; CompressionType output_compression_;
Statistics* stats_; Statistics* stats_;
EventLogger* event_logger_; EventLogger* event_logger_;

@ -388,7 +388,7 @@ Status MemTableList::TryInstallMemtableFlushResults(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& mems, LogsWithPrepTracker* prep_tracker, const autovector<MemTable*>& mems, LogsWithPrepTracker* prep_tracker,
VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
autovector<MemTable*>* to_delete, Directory* db_directory, autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer, LogBuffer* log_buffer,
std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info) { std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info) {
AutoThreadOperationStageUpdater stage_updater( AutoThreadOperationStageUpdater stage_updater(
@ -642,7 +642,7 @@ Status InstallMemtableAtomicFlushResults(
const autovector<const MutableCFOptions*>& mutable_cf_options_list, const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset, const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_metas, InstrumentedMutex* mu, const autovector<FileMetaData*>& file_metas,
autovector<MemTable*>* to_delete, Directory* db_directory, autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer) { LogBuffer* log_buffer) {
AutoThreadOperationStageUpdater stage_updater( AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);

@ -140,7 +140,7 @@ class MemTableListVersion {
const autovector<const autovector<MemTable*>*>& mems_list, const autovector<const autovector<MemTable*>*>& mems_list,
VersionSet* vset, InstrumentedMutex* mu, VersionSet* vset, InstrumentedMutex* mu,
const autovector<FileMetaData*>& file_meta, const autovector<FileMetaData*>& file_meta,
autovector<MemTable*>* to_delete, Directory* db_directory, autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer); LogBuffer* log_buffer);
// REQUIRE: m is an immutable memtable // REQUIRE: m is an immutable memtable
@ -264,7 +264,7 @@ class MemTableList {
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& m, LogsWithPrepTracker* prep_tracker, const autovector<MemTable*>& m, LogsWithPrepTracker* prep_tracker,
VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
autovector<MemTable*>* to_delete, Directory* db_directory, autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer, LogBuffer* log_buffer,
std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info); std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info);
@ -379,7 +379,7 @@ class MemTableList {
const autovector<const autovector<MemTable*>*>& mems_list, const autovector<const autovector<MemTable*>*>& mems_list,
VersionSet* vset, InstrumentedMutex* mu, VersionSet* vset, InstrumentedMutex* mu,
const autovector<FileMetaData*>& file_meta, const autovector<FileMetaData*>& file_meta,
autovector<MemTable*>* to_delete, Directory* db_directory, autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer); LogBuffer* log_buffer);
// DB mutex held // DB mutex held
@ -421,6 +421,6 @@ extern Status InstallMemtableAtomicFlushResults(
const autovector<const MutableCFOptions*>& mutable_cf_options_list, const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset, const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta, InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
autovector<MemTable*>* to_delete, Directory* db_directory, autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer); LogBuffer* log_buffer);
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -3600,7 +3600,7 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
Status VersionSet::ProcessManifestWrites( Status VersionSet::ProcessManifestWrites(
std::deque<ManifestWriter>& writers, InstrumentedMutex* mu, std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
Directory* db_directory, bool new_descriptor_log, FSDirectory* db_directory, bool new_descriptor_log,
const ColumnFamilyOptions* new_cf_options) { const ColumnFamilyOptions* new_cf_options) {
assert(!writers.empty()); assert(!writers.empty());
ManifestWriter& first_writer = writers.front(); ManifestWriter& first_writer = writers.front();
@ -4026,7 +4026,7 @@ Status VersionSet::LogAndApply(
const autovector<ColumnFamilyData*>& column_family_datas, const autovector<ColumnFamilyData*>& column_family_datas,
const autovector<const MutableCFOptions*>& mutable_cf_options_list, const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<autovector<VersionEdit*>>& edit_lists, const autovector<autovector<VersionEdit*>>& edit_lists,
InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log, InstrumentedMutex* mu, FSDirectory* db_directory, bool new_descriptor_log,
const ColumnFamilyOptions* new_cf_options) { const ColumnFamilyOptions* new_cf_options) {
mu->AssertHeld(); mu->AssertHeld();
int num_edits = 0; int num_edits = 0;

@ -822,7 +822,7 @@ class VersionSet {
Status LogAndApply( Status LogAndApply(
ColumnFamilyData* column_family_data, ColumnFamilyData* column_family_data,
const MutableCFOptions& mutable_cf_options, VersionEdit* edit, const MutableCFOptions& mutable_cf_options, VersionEdit* edit,
InstrumentedMutex* mu, Directory* db_directory = nullptr, InstrumentedMutex* mu, FSDirectory* db_directory = nullptr,
bool new_descriptor_log = false, bool new_descriptor_log = false,
const ColumnFamilyOptions* column_family_options = nullptr) { const ColumnFamilyOptions* column_family_options = nullptr) {
autovector<ColumnFamilyData*> cfds; autovector<ColumnFamilyData*> cfds;
@ -842,7 +842,7 @@ class VersionSet {
ColumnFamilyData* column_family_data, ColumnFamilyData* column_family_data,
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options,
const autovector<VersionEdit*>& edit_list, InstrumentedMutex* mu, const autovector<VersionEdit*>& edit_list, InstrumentedMutex* mu,
Directory* db_directory = nullptr, bool new_descriptor_log = false, FSDirectory* db_directory = nullptr, bool new_descriptor_log = false,
const ColumnFamilyOptions* column_family_options = nullptr) { const ColumnFamilyOptions* column_family_options = nullptr) {
autovector<ColumnFamilyData*> cfds; autovector<ColumnFamilyData*> cfds;
cfds.emplace_back(column_family_data); cfds.emplace_back(column_family_data);
@ -861,7 +861,7 @@ class VersionSet {
const autovector<ColumnFamilyData*>& cfds, const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list, const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<autovector<VersionEdit*>>& edit_lists, const autovector<autovector<VersionEdit*>>& edit_lists,
InstrumentedMutex* mu, Directory* db_directory = nullptr, InstrumentedMutex* mu, FSDirectory* db_directory = nullptr,
bool new_descriptor_log = false, bool new_descriptor_log = false,
const ColumnFamilyOptions* new_cf_options = nullptr); const ColumnFamilyOptions* new_cf_options = nullptr);
@ -1170,7 +1170,7 @@ class VersionSet {
private: private:
// REQUIRES db mutex at beginning. may release and re-acquire db mutex // REQUIRES db mutex at beginning. may release and re-acquire db mutex
Status ProcessManifestWrites(std::deque<ManifestWriter>& writers, Status ProcessManifestWrites(std::deque<ManifestWriter>& writers,
InstrumentedMutex* mu, Directory* db_directory, InstrumentedMutex* mu, FSDirectory* db_directory,
bool new_descriptor_log, bool new_descriptor_log,
const ColumnFamilyOptions* new_cf_options); const ColumnFamilyOptions* new_cf_options);
@ -1237,7 +1237,7 @@ class ReactiveVersionSet : public VersionSet {
const autovector<ColumnFamilyData*>& /*cfds*/, const autovector<ColumnFamilyData*>& /*cfds*/,
const autovector<const MutableCFOptions*>& /*mutable_cf_options_list*/, const autovector<const MutableCFOptions*>& /*mutable_cf_options_list*/,
const autovector<autovector<VersionEdit*>>& /*edit_lists*/, const autovector<autovector<VersionEdit*>>& /*edit_lists*/,
InstrumentedMutex* /*mu*/, Directory* /*db_directory*/, InstrumentedMutex* /*mu*/, FSDirectory* /*db_directory*/,
bool /*new_descriptor_log*/, bool /*new_descriptor_log*/,
const ColumnFamilyOptions* /*new_cf_option*/) override { const ColumnFamilyOptions* /*new_cf_option*/) override {
return Status::NotSupported("not supported in reactive mode"); return Status::NotSupported("not supported in reactive mode");

@ -370,7 +370,7 @@ bool ParseFileName(const std::string& fname, uint64_t* number,
Status SetCurrentFile(Env* env, const std::string& dbname, Status SetCurrentFile(Env* env, const std::string& dbname,
uint64_t descriptor_number, uint64_t descriptor_number,
Directory* directory_to_fsync) { FSDirectory* directory_to_fsync) {
// Remove leading "dbname/" and add newline to manifest file name // Remove leading "dbname/" and add newline to manifest file name
std::string manifest = DescriptorFileName(dbname, descriptor_number); std::string manifest = DescriptorFileName(dbname, descriptor_number);
Slice contents = manifest; Slice contents = manifest;
@ -385,7 +385,7 @@ Status SetCurrentFile(Env* env, const std::string& dbname,
} }
if (s.ok()) { if (s.ok()) {
if (directory_to_fsync != nullptr) { if (directory_to_fsync != nullptr) {
s = directory_to_fsync->Fsync(); s = directory_to_fsync->Fsync(IOOptions(), nullptr);
} }
} else { } else {
env->DeleteFile(tmp); env->DeleteFile(tmp);

@ -17,6 +17,7 @@
#include "options/db_options.h" #include "options/db_options.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/file_system.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
@ -164,7 +165,7 @@ extern bool ParseFileName(const std::string& filename, uint64_t* number,
// specified number. // specified number.
extern Status SetCurrentFile(Env* env, const std::string& dbname, extern Status SetCurrentFile(Env* env, const std::string& dbname,
uint64_t descriptor_number, uint64_t descriptor_number,
Directory* directory_to_fsync); FSDirectory* directory_to_fsync);
// Make the IDENTITY file for the db // Make the IDENTITY file for the db
extern Status SetIdentityFile(Env* env, const std::string& dbname, extern Status SetIdentityFile(Env* env, const std::string& dbname,

Loading…
Cancel
Save