Store FileSystemPtr object that contains FileSystem ptr (#7180)

Summary:
As part of the IOTracing project, this PR
    1. Caches "FileSystemPtr" object(wrapper class that returns file system pointer based on tracing enabled) instead of "FileSystem" pointer.
    2. FileSystemPtr object is created using FileSystem pointer and IOTracer
    pointer.
    3. IOTracer shared_ptr is created in DBImpl and it is passed to different classes through constructor.
    4. When tracing is enabled through DB::StartIOTrace, FileSystemPtr
    returns FileSystemTracingWrapper pointer for tracing purpose and when
    it is disabled underlying FileSystem pointer is returned.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7180

Test Plan:
make check -j64
                COMPILE_WITH_TSAN=1 make check -j64

Reviewed By: anand1976

Differential Revision: D22987117

Pulled By: akankshamahajan15

fbshipit-source-id: 6073617e4c2d5bc363914f3a1f55ae3b0a58fbf1
main
Akanksha Mahajan 5 years ago committed by Facebook GitHub Bot
parent 2bc63e3aba
commit 1f9f630b27
  1. 1
      CMakeLists.txt
  2. 10
      db/compaction/compaction_job.cc
  3. 38
      db/compaction/compaction_job.h
  4. 18
      db/compaction/compaction_job_test.cc
  5. 14
      db/db_impl/db_impl.cc
  6. 2
      db/db_impl/db_impl.h
  7. 6
      db/db_impl/db_impl_compaction_flush.cc
  8. 2
      db/db_impl/db_impl_secondary.cc
  9. 11
      db/db_wal_test.cc
  10. 6
      db/external_sst_file_ingestion_job.cc
  11. 8
      db/external_sst_file_ingestion_job.h
  12. 8
      db/flush_job_test.cc
  13. 2
      db/import_column_family_job.cc
  14. 7
      db/import_column_family_job.h
  15. 6
      db/memtable_list_test.cc
  16. 2
      db/repair.cc
  17. 7
      db/transaction_log_impl.cc
  18. 3
      db/transaction_log_impl.h
  19. 31
      db/version_set.cc
  20. 12
      db/version_set.h
  21. 10
      db/version_set_test.cc
  22. 2
      db/wal_manager.cc
  23. 13
      db/wal_manager.h
  24. 14
      db/wal_manager_test.cc
  25. 4
      env/file_system_tracer.cc
  26. 18
      env/file_system_tracer.h
  27. 6
      tools/ldb_cmd.cc
  28. 2
      tools/ldb_cmd_test.cc

@ -1118,6 +1118,7 @@ if(WITH_TESTS)
table/table_test.cc
table/block_fetcher_test.cc
test_util/testutil_test.cc
trace_replay/io_tracer_test.cc
tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc
tools/ldb_cmd_test.cc
tools/reduce_levels_test.cc

@ -328,8 +328,9 @@ CompactionJob::CompactionJob(
const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, const std::atomic<bool>* manual_compaction_paused,
const std::string& db_id, const std::string& db_session_id)
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<bool>* manual_compaction_paused, const std::string& db_id,
const std::string& db_session_id)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
compaction_job_stats_(compaction_job_stats),
@ -340,7 +341,7 @@ CompactionJob::CompactionJob(
db_options_(db_options),
file_options_(file_options),
env_(db_options.env),
fs_(db_options.fs.get()),
fs_(db_options.fs, io_tracer),
file_options_for_read_(
fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
versions_(versions),
@ -1564,7 +1565,8 @@ Status CompactionJob::OpenCompactionOutputFile(
&syncpoint_arg);
#endif
Status s;
IOStatus io_s = NewWritableFile(fs_, fname, &writable_file, file_options_);
IOStatus io_s =
NewWritableFile(fs_.get(), fname, &writable_file, file_options_);
s = io_s;
if (sub_compact->io_status.ok()) {
sub_compact->io_status = io_s;

@ -62,25 +62,23 @@ class VersionSet;
// if needed.
class CompactionJob {
public:
CompactionJob(int job_id, Compaction* compaction,
const ImmutableDBOptions& db_options,
const FileOptions& file_options, VersionSet* versions,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum,
LogBuffer* log_buffer, FSDirectory* db_directory,
FSDirectory* output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname,
CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri,
const std::atomic<bool>* manual_compaction_paused = nullptr,
const std::string& db_id = "",
const std::string& db_session_id = "");
CompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const FileOptions& file_options, VersionSet* versions,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
FSDirectory* db_directory, FSDirectory* output_directory,
Statistics* stats, InstrumentedMutex* db_mutex,
ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<bool>* manual_compaction_paused = nullptr,
const std::string& db_id = "", const std::string& db_session_id = "");
~CompactionJob();
@ -160,7 +158,7 @@ class CompactionJob {
const FileOptions file_options_;
Env* env_;
FileSystem* fs_;
FileSystemPtr fs_;
// env_option optimized for compaction table reads
FileOptions file_options_for_read_;
VersionSet* versions_;

@ -79,10 +79,10 @@ class CompactionJobTest : public testing::Test {
mutable_db_options_(),
table_cache_(NewLRUCache(50000, 16)),
write_buffer_manager_(db_options_.db_write_buffer_size),
versions_(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_buffer_manager_,
&write_controller_,
/*block_cache_tracer=*/nullptr)),
versions_(new VersionSet(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)),
shutting_down_(false),
preserve_deletes_seqnum_(0),
mock_table_factory_(new mock::MockTableFactory()),
@ -249,10 +249,10 @@ class CompactionJobTest : public testing::Test {
void NewDB() {
DestroyDB(dbname_, Options());
EXPECT_OK(env_->CreateDirIfMissing(dbname_));
versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_buffer_manager_,
&write_controller_,
/*block_cache_tracer=*/nullptr));
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
compaction_job_stats_.Reset();
SetIdentityFile(env_, dbname_);
@ -334,7 +334,7 @@ class CompactionJobTest : public testing::Test {
nullptr, nullptr, &mutex_, &error_handler_, snapshots,
earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
&event_logger, false, false, dbname_, &compaction_job_stats_,
Env::Priority::USER);
Env::Priority::USER, nullptr /* IOTracer */);
VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
compaction_job.Prepare();

@ -151,8 +151,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
initial_db_options_(SanitizeOptions(dbname, options)),
env_(initial_db_options_.env),
io_tracer_(std::make_shared<IOTracer>()),
fs_(initial_db_options_.env->GetFileSystem()),
immutable_db_options_(initial_db_options_),
fs_(immutable_db_options_.fs, io_tracer_),
mutable_db_options_(initial_db_options_),
stats_(immutable_db_options_.statistics.get()),
mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS,
@ -197,7 +197,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
unable_to_release_oldest_log_(false),
num_running_ingest_file_(0),
#ifndef ROCKSDB_LITE
wal_manager_(immutable_db_options_, file_options_, seq_per_batch),
wal_manager_(immutable_db_options_, file_options_, io_tracer_,
seq_per_batch),
#endif // ROCKSDB_LITE
event_logger_(immutable_db_options_.info_log.get()),
bg_work_paused_(0),
@ -245,7 +246,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_,
table_cache_.get(), write_buffer_manager_,
&write_controller_, &block_cache_tracer_));
&write_controller_, &block_cache_tracer_,
io_tracer_));
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
@ -3931,7 +3933,7 @@ Status DBImpl::WriteOptionsFile(bool need_mutex_lock,
std::string file_name =
TempOptionsFileName(GetName(), versions_->NewFileNumber());
Status s = PersistRocksDBOptions(db_options, cf_names, cf_opts, file_name,
GetFileSystem());
fs_.get());
if (s.ok()) {
s = RenameTempFileToOptionsFile(file_name);
@ -4278,7 +4280,7 @@ Status DBImpl::IngestExternalFiles(
auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
ingestion_jobs.emplace_back(
env_, versions_.get(), cfd, immutable_db_options_, file_options_,
&snapshots_, arg.options, &directories_, &event_logger_);
&snapshots_, arg.options, &directories_, &event_logger_, io_tracer_);
}
std::vector<std::pair<bool, Status>> exec_results;
for (size_t i = 0; i != num_cfs; ++i) {
@ -4552,7 +4554,7 @@ Status DBImpl::CreateColumnFamilyWithImport(
auto cfd = cfh->cfd();
ImportColumnFamilyJob import_job(env_, versions_.get(), cfd,
immutable_db_options_, file_options_,
import_options, metadata.files);
import_options, metadata.files, io_tracer_);
SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
VersionEdit dummy_edit;

@ -1012,8 +1012,8 @@ class DBImpl : public DB {
const DBOptions initial_db_options_;
Env* const env_;
std::shared_ptr<IOTracer> io_tracer_;
std::shared_ptr<FileSystem> fs_;
const ImmutableDBOptions immutable_db_options_;
FileSystemPtr fs_;
MutableDBOptions mutable_db_options_;
Statistics* stats_;
std::unordered_map<std::string, RecoveredTransaction*>

@ -1051,8 +1051,8 @@ Status DBImpl::CompactFilesImpl(
snapshot_checker, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, Env::Priority::USER, &manual_compaction_paused_,
db_id_, db_session_id_);
&compaction_job_stats, Env::Priority::USER, io_tracer_,
&manual_compaction_paused_, db_id_, db_session_id_);
// Creating a compaction influences the compaction score because the score
// takes running compactions into account (by skipping files that are already
@ -2846,7 +2846,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
&event_logger_, c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, thread_pri,
&compaction_job_stats, thread_pri, io_tracer_,
is_manual ? &manual_compaction_paused_ : nullptr, db_id_,
db_session_id_);
compaction_job.Prepare();

@ -611,7 +611,7 @@ Status DB::OpenAsSecondary(
impl->versions_.reset(new ReactiveVersionSet(
dbname, &impl->immutable_db_options_, impl->file_options_,
impl->table_cache_.get(), impl->write_buffer_manager_,
&impl->write_controller_));
&impl->write_controller_, impl->io_tracer_));
impl->column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet()));
impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_);

@ -936,12 +936,13 @@ class RecoveryTestHelper {
std::unique_ptr<WalManager> wal_manager;
WriteController write_controller;
versions.reset(new VersionSet(test->dbname_, &db_options, env_options,
table_cache.get(), &write_buffer_manager,
&write_controller,
/*block_cache_tracer=*/nullptr));
versions.reset(new VersionSet(
test->dbname_, &db_options, env_options, table_cache.get(),
&write_buffer_manager, &write_controller,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
wal_manager.reset(new WalManager(db_options, env_options));
wal_manager.reset(
new WalManager(db_options, env_options, /*io_tracer=*/nullptr));
std::unique_ptr<log::Writer> current_log_writer;

@ -136,7 +136,7 @@ Status ExternalSstFileIngestionJob::Prepare(
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",
nullptr);
// CopyFile also sync the new file.
status = CopyFile(fs_, path_outside_db, path_inside_db, 0,
status = CopyFile(fs_.get(), path_outside_db, path_inside_db, 0,
db_options_.use_fsync);
}
TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded");
@ -194,7 +194,7 @@ Status ExternalSstFileIngestionJob::Prepare(
for (size_t i = 0; i < files_to_ingest_.size(); i++) {
std::string generated_checksum, generated_checksum_func_name;
IOStatus io_s = GenerateOneFileChecksum(
fs_, files_to_ingest_[i].internal_file_path,
fs_.get(), files_to_ingest_[i].internal_file_path,
db_options_.file_checksum_gen_factory.get(), &generated_checksum,
&generated_checksum_func_name,
ingestion_options_.verify_checksums_readahead_size,
@ -831,7 +831,7 @@ IOStatus ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile(
}
std::string file_checksum, file_checksum_func_name;
IOStatus io_s = GenerateOneFileChecksum(
fs_, file_to_ingest->internal_file_path,
fs_.get(), file_to_ingest->internal_file_path,
db_options_.file_checksum_gen_factory.get(), &file_checksum,
&file_checksum_func_name,
ingestion_options_.verify_checksums_readahead_size,

@ -12,6 +12,7 @@
#include "db/dbformat.h"
#include "db/internal_stats.h"
#include "db/snapshot_impl.h"
#include "env/file_system_tracer.h"
#include "logging/event_logger.h"
#include "options/db_options.h"
#include "rocksdb/db.h"
@ -76,9 +77,10 @@ class ExternalSstFileIngestionJob {
const ImmutableDBOptions& db_options, const EnvOptions& env_options,
SnapshotList* db_snapshots,
const IngestExternalFileOptions& ingestion_options,
Directories* directories, EventLogger* event_logger)
Directories* directories, EventLogger* event_logger,
const std::shared_ptr<IOTracer>& io_tracer)
: env_(env),
fs_(db_options.fs.get()),
fs_(db_options.fs, io_tracer),
versions_(versions),
cfd_(cfd),
db_options_(db_options),
@ -167,7 +169,7 @@ class ExternalSstFileIngestionJob {
Status SyncIngestedFile(TWritableFile* file);
Env* env_;
FileSystem* fs_;
FileSystemPtr fs_;
VersionSet* versions_;
ColumnFamilyData* cfd_;
const ImmutableDBOptions& db_options_;

@ -55,10 +55,10 @@ class FlushJobTest : public testing::Test {
db_options_.env = env_;
db_options_.fs = fs_;
versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_buffer_manager_,
&write_controller_,
/*block_cache_tracer=*/nullptr));
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
EXPECT_OK(versions_->Recover(column_families, false));
}

@ -100,7 +100,7 @@ Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number,
}
}
if (!hardlink_files) {
status = CopyFile(fs_, path_outside_db, path_inside_db, 0,
status = CopyFile(fs_.get(), path_outside_db, path_inside_db, 0,
db_options_.use_fsync);
}
if (!status.ok()) {

@ -24,12 +24,13 @@ class ImportColumnFamilyJob {
const ImmutableDBOptions& db_options,
const EnvOptions& env_options,
const ImportColumnFamilyOptions& import_options,
const std::vector<LiveFileMetaData>& metadata)
const std::vector<LiveFileMetaData>& metadata,
const std::shared_ptr<IOTracer>& io_tracer)
: env_(env),
versions_(versions),
cfd_(cfd),
db_options_(db_options),
fs_(db_options_.fs.get()),
fs_(db_options_.fs, io_tracer),
env_options_(env_options),
import_options_(import_options),
metadata_(metadata) {}
@ -61,7 +62,7 @@ class ImportColumnFamilyJob {
VersionSet* versions_;
ColumnFamilyData* cfd_;
const ImmutableDBOptions& db_options_;
FileSystem* fs_;
const FileSystemPtr fs_;
const EnvOptions& env_options_;
autovector<IngestedFileInfo> files_to_import_;
VersionEdit edit_;

@ -100,7 +100,8 @@ class MemTableListTest : public testing::Test {
VersionSet versions(dbname, &immutable_db_options, env_options,
table_cache.get(), &write_buffer_manager,
&write_controller, /*block_cache_tracer=*/nullptr);
&write_controller, /*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr);
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back("one", ColumnFamilyOptions());
@ -148,7 +149,8 @@ class MemTableListTest : public testing::Test {
VersionSet versions(dbname, &immutable_db_options, env_options,
table_cache.get(), &write_buffer_manager,
&write_controller, /*block_cache_tracer=*/nullptr);
&write_controller, /*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr);
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back("one", ColumnFamilyOptions());

@ -117,7 +117,7 @@ class Repairer {
wc_(db_options_.delayed_write_rate),
vset_(dbname_, &immutable_db_options_, env_options_,
raw_table_cache_.get(), &wb_, &wc_,
/*block_cache_tracer=*/nullptr),
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr),
next_file_number_(1),
db_lock_(nullptr) {
for (const auto& cfd : column_families) {

@ -17,7 +17,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seq,
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
const bool seq_per_batch)
const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer)
: dir_(dir),
options_(options),
read_options_(read_options),
@ -30,7 +30,8 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
current_batch_seq_(0),
current_last_seq_(0),
versions_(versions),
seq_per_batch_(seq_per_batch) {
seq_per_batch_(seq_per_batch),
io_tracer_(io_tracer) {
assert(files_ != nullptr);
assert(versions_ != nullptr);
@ -42,7 +43,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
Status TransactionLogIteratorImpl::OpenLogFile(
const LogFile* log_file,
std::unique_ptr<SequentialFileReader>* file_reader) {
FileSystem* fs = options_->fs.get();
FileSystemPtr fs(options_->fs, io_tracer_);
std::unique_ptr<FSSequentialFile> file;
std::string fname;
Status s;

@ -63,7 +63,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seqNum,
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
const bool seq_per_batch);
const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer);
virtual bool Valid() override;
@ -122,6 +122,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
// Update current batch if a continuous batch is found, else return false
void UpdateCurrentWriteBatch(const Slice& record);
Status OpenLogReader(const LogFile* file);
std::shared_ptr<IOTracer> io_tracer_;
};
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -3615,12 +3615,13 @@ VersionSet::VersionSet(const std::string& dbname,
const FileOptions& storage_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller,
BlockCacheTracer* const block_cache_tracer)
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer)
: column_family_set_(new ColumnFamilySet(
dbname, _db_options, storage_options, table_cache,
write_buffer_manager, write_controller, block_cache_tracer)),
env_(_db_options->env),
fs_(_db_options->fs.get()),
fs_(_db_options->fs, io_tracer),
dbname_(dbname),
db_options_(_db_options),
next_file_number_(2),
@ -3634,7 +3635,8 @@ VersionSet::VersionSet(const std::string& dbname,
current_version_number_(0),
manifest_file_size_(0),
file_options_(storage_options),
block_cache_tracer_(block_cache_tracer) {}
block_cache_tracer_(block_cache_tracer),
io_tracer_(io_tracer) {}
VersionSet::~VersionSet() {
// we need to delete column_family_set_ because its destructor depends on
@ -3937,7 +3939,7 @@ Status VersionSet::ProcessManifestWrites(
std::string descriptor_fname =
DescriptorFileName(dbname_, pending_manifest_file_number_);
std::unique_ptr<FSWritableFile> descriptor_file;
io_s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
io_s = NewWritableFile(fs_.get(), descriptor_fname, &descriptor_file,
opt_file_opts);
if (io_s.ok()) {
descriptor_file->SetPreallocationBlockSize(
@ -4006,7 +4008,7 @@ Status VersionSet::ProcessManifestWrites(
// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && new_descriptor_log) {
io_s = SetCurrentFile(fs_, dbname_, pending_manifest_file_number_,
io_s = SetCurrentFile(fs_.get(), dbname_, pending_manifest_file_number_,
db_directory);
if (!io_s.ok()) {
s = io_s;
@ -4536,7 +4538,7 @@ Status VersionSet::Recover(
// Read "CURRENT" file, which contains a pointer to the current manifest file
std::string manifest_path;
Status s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
Status s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path,
&manifest_file_number_);
if (!s.ok()) {
return s;
@ -4943,7 +4945,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
WriteController wc(options->delayed_write_rate);
WriteBufferManager wb(options->db_write_buffer_size);
VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr);
nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/);
Status status;
std::vector<ColumnFamilyDescriptor> dummy;
@ -5925,15 +5927,14 @@ Status VersionSet::VerifyFileMetadata(const std::string& fpath,
return status;
}
ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname,
const ImmutableDBOptions* _db_options,
const FileOptions& _file_options,
Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller)
ReactiveVersionSet::ReactiveVersionSet(
const std::string& dbname, const ImmutableDBOptions* _db_options,
const FileOptions& _file_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager, WriteController* write_controller,
const std::shared_ptr<IOTracer>& io_tracer)
: VersionSet(dbname, _db_options, _file_options, table_cache,
write_buffer_manager, write_controller,
/*block_cache_tracer=*/nullptr),
/*block_cache_tracer=*/nullptr, io_tracer),
number_of_edits_to_skip_(0) {}
ReactiveVersionSet::~ReactiveVersionSet() {}
@ -6345,7 +6346,7 @@ Status ReactiveVersionSet::MaybeSwitchManifest(
Status s;
do {
std::string manifest_path;
s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path,
&manifest_file_number_);
std::unique_ptr<FSSequentialFile> manifest_file;
if (s.ok()) {

@ -42,6 +42,7 @@
#include "db/version_builder.h"
#include "db/version_edit.h"
#include "db/write_controller.h"
#include "env/file_system_tracer.h"
#include "monitoring/instrumented_mutex.h"
#include "options/db_options.h"
#include "port/port.h"
@ -763,7 +764,6 @@ class Version {
private:
Env* env_;
FileSystem* fs_;
friend class ReactiveVersionSet;
friend class VersionSet;
friend class VersionEditHandler;
@ -897,7 +897,8 @@ class VersionSet {
const FileOptions& file_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller,
BlockCacheTracer* const block_cache_tracer);
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer);
// No copying allowed
VersionSet(const VersionSet&) = delete;
void operator=(const VersionSet&) = delete;
@ -1277,7 +1278,7 @@ class VersionSet {
std::unique_ptr<ColumnFamilySet> column_family_set_;
Env* const env_;
FileSystem* const fs_;
FileSystemPtr const fs_;
const std::string dbname_;
std::string db_id_;
const ImmutableDBOptions* const db_options_;
@ -1330,6 +1331,8 @@ class VersionSet {
// Store the IO status when Manifest is written
IOStatus io_status_;
std::shared_ptr<IOTracer> io_tracer_;
private:
// REQUIRES db mutex at beginning. may release and re-acquire db mutex
Status ProcessManifestWrites(std::deque<ManifestWriter>& writers,
@ -1352,7 +1355,8 @@ class ReactiveVersionSet : public VersionSet {
const ImmutableDBOptions* _db_options,
const FileOptions& _file_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller);
WriteController* write_controller,
const std::shared_ptr<IOTracer>& io_tracer);
~ReactiveVersionSet() override;

@ -726,13 +726,13 @@ class VersionSetTestBase {
db_options_.env = env_;
db_options_.fs = fs_;
versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_buffer_manager_,
&write_controller_,
/*block_cache_tracer=*/nullptr));
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
reactive_versions_ = std::make_shared<ReactiveVersionSet>(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_);
&write_buffer_manager_, &write_controller_, nullptr);
db_options_.db_paths.emplace_back(dbname_,
std::numeric_limits<uint64_t>::max());
}

@ -121,7 +121,7 @@ Status WalManager::GetUpdatesSince(
}
iter->reset(new TransactionLogIteratorImpl(
db_options_.wal_dir, &db_options_, read_options, file_options_, seq,
std::move(wal_files), version_set, seq_per_batch_));
std::move(wal_files), version_set, seq_per_batch_, io_tracer_));
return (*iter)->status();
}

@ -36,14 +36,17 @@ namespace ROCKSDB_NAMESPACE {
class WalManager {
public:
WalManager(const ImmutableDBOptions& db_options,
const FileOptions& file_options, const bool seq_per_batch = false)
const FileOptions& file_options,
const std::shared_ptr<IOTracer>& io_tracer,
const bool seq_per_batch = false)
: db_options_(db_options),
file_options_(file_options),
env_(db_options.env),
fs_(db_options.fs.get()),
fs_(db_options.fs, io_tracer),
purge_wal_files_last_run_(0),
seq_per_batch_(seq_per_batch),
wal_in_db_path_(IsWalDirSameAsDBPath(&db_options)) {}
wal_in_db_path_(IsWalDirSameAsDBPath(&db_options)),
io_tracer_(io_tracer) {}
Status GetSortedWalFiles(VectorLogPtr& files);
@ -91,7 +94,7 @@ class WalManager {
const ImmutableDBOptions& db_options_;
const FileOptions file_options_;
Env* env_;
FileSystem* fs_;
const FileSystemPtr fs_;
// ------- WalManager state -------
// cache for ReadFirstRecord() calls
@ -108,6 +111,8 @@ class WalManager {
// obsolete files will be deleted every this seconds if ttl deletion is
// enabled and archive size_limit is disabled.
static const uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600;
std::shared_ptr<IOTracer> io_tracer_;
};
#endif // ROCKSDB_LITE

@ -50,16 +50,18 @@ class WalManagerTest : public testing::Test {
fs_.reset(new LegacyFileSystemWrapper(env_.get()));
db_options_.fs = fs_;
versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_buffer_manager_,
&write_controller_,
/*block_cache_tracer=*/nullptr));
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
wal_manager_.reset(new WalManager(db_options_, env_options_));
wal_manager_.reset(
new WalManager(db_options_, env_options_, nullptr /*IOTracer*/));
}
void Reopen() {
wal_manager_.reset(new WalManager(db_options_, env_options_));
wal_manager_.reset(
new WalManager(db_options_, env_options_, nullptr /*IOTracer*/));
}
// NOT thread safe

@ -16,8 +16,8 @@ IOStatus FileSystemTracingWrapper::NewWritableFile(
timer.Start();
IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(elapsed, TraceType::kIOFileName, __func__, elapsed,
s.ToString(), fname);
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileName, __func__,
elapsed, s.ToString(), fname);
io_tracer_->WriteIOOp(io_record);
return s;
}

@ -66,11 +66,10 @@ class FileSystemTracingWrapper : public FileSystemWrapper {
class FileSystemPtr {
public:
FileSystemPtr(std::shared_ptr<FileSystem> fs,
std::shared_ptr<IOTracer> io_tracer)
: fs_(fs),
io_tracer_(io_tracer),
fs_tracer_(
std::make_shared<FileSystemTracingWrapper>(fs_, io_tracer_)) {}
const std::shared_ptr<IOTracer>& io_tracer)
: fs_(fs), io_tracer_(io_tracer) {
fs_tracer_ = std::make_shared<FileSystemTracingWrapper>(fs_, io_tracer_);
}
std::shared_ptr<FileSystem> operator->() const {
if (io_tracer_ && io_tracer_->is_tracing_enabled()) {
@ -80,6 +79,15 @@ class FileSystemPtr {
}
}
/* Returns the underlying File System pointer */
FileSystem* get() const {
if (io_tracer_ && io_tracer_->is_tracing_enabled()) {
return fs_tracer_.get();
} else {
return fs_.get();
}
}
private:
std::shared_ptr<FileSystem> fs_;
std::shared_ptr<IOTracer> io_tracer_;

@ -1030,7 +1030,7 @@ void DumpManifestFile(Options options, std::string file, bool verbose, bool hex,
WriteBufferManager wb(options.db_write_buffer_size);
ImmutableDBOptions immutable_db_options(options);
VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr);
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr);
Status s = versions.DumpManifest(options, file, verbose, hex, json);
if (!s.ok()) {
fprintf(stderr, "Error in processing file %s %s\n", file.c_str(),
@ -1172,7 +1172,7 @@ void GetLiveFilesChecksumInfoFromVersionSet(Options options,
WriteBufferManager wb(options.db_write_buffer_size);
ImmutableDBOptions immutable_db_options(options);
VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr);
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr);
std::vector<std::string> cf_name_list;
s = versions.ListColumnFamilies(&cf_name_list, db_path,
immutable_db_options.fs.get());
@ -1892,7 +1892,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt,
WriteController wc(opt.delayed_write_rate);
WriteBufferManager wb(opt.db_write_buffer_size);
VersionSet versions(db_path_, &db_options, soptions, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr);
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr);
std::vector<ColumnFamilyDescriptor> dummy;
ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
ColumnFamilyOptions(opt));

@ -205,7 +205,7 @@ class FileChecksumTestHelper {
WriteBufferManager wb(options_.db_write_buffer_size);
ImmutableDBOptions immutable_db_options(options_);
VersionSet versions(dbname_, &immutable_db_options, sopt, tc.get(), &wb,
&wc, nullptr);
&wc, nullptr, nullptr);
std::vector<std::string> cf_name_list;
Status s;
s = versions.ListColumnFamilies(&cf_name_list, dbname_,

Loading…
Cancel
Save