Store FSWritableFilePtr object in WritableFileWriter (#7193)

Summary:
Replace FSWritableFile pointer with FSWritableFilePtr
    object in WritableFileWriter.
    This new object wraps FSWritableFile pointer.

    Objective: If tracing is enabled, FSWritableFile Ptr returns
    FSWritableFileTracingWrapper pointer that includes all necessary
    information in IORecord and calls underlying FileSystem and invokes
    IOTracer to dump that record in a binary file. If tracing is disabled
    then, underlying FileSystem pointer is returned directly.
    FSWritableFilePtr wrapper class is added to bypass the
    FSWritableFileWrapper when
    tracing is disabled.

    Test Plan: make check -j64

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7193

Reviewed By: anand1976

Differential Revision: D23355915

Pulled By: akankshamahajan15

fbshipit-source-id: e62a27a13c1fd77e36a6dbafc7006d969bed25cf
main
Akanksha Mahajan 4 years ago committed by Facebook GitHub Bot
parent 4e258d3e63
commit b175eceb09
  1. 8
      db/blob/blob_file_builder.cc
  2. 8
      db/builder.cc
  3. 3
      db/builder.h
  4. 9
      db/compaction/compaction_job.cc
  5. 1
      db/compaction/compaction_job.h
  6. 4
      db/db_impl/db_impl_compaction_flush.cc
  7. 17
      db/db_impl/db_impl_open.cc
  8. 37
      db/flush_job.cc
  9. 5
      db/flush_job.h
  10. 66
      db/flush_job_test.cc
  11. 10
      db/repair.cc
  12. 15
      db/version_edit_handler.cc
  13. 6
      db/version_edit_handler.h
  14. 26
      db/version_set.cc
  15. 7
      db/version_set.h
  16. 34
      env/file_system_tracer.h
  17. 2
      file/writable_file_writer.cc
  18. 9
      file/writable_file_writer.h
  19. 5
      table/sst_file_writer.cc

@ -167,10 +167,10 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
Statistics* const statistics = immutable_cf_options_->statistics;
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), blob_file_path, *file_options_,
env_, statistics, immutable_cf_options_->listeners,
immutable_cf_options_->file_checksum_gen_factory));
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(file), blob_file_path, *file_options_, env_,
nullptr /*IOTracer*/, statistics, immutable_cf_options_->listeners,
immutable_cf_options_->file_checksum_gen_factory));
std::unique_ptr<BlobLogWriter> blob_log_writer(
new BlobLogWriter(std::move(file_writer), env_, statistics,

@ -83,7 +83,8 @@ Status BuildTable(
uint64_t sample_for_compression, const CompressionOptions& compression_opts,
bool paranoid_file_checks, InternalStats* internal_stats,
TableFileCreationReason reason, IOStatus* io_status,
EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
const std::shared_ptr<IOTracer>& io_tracer, EventLogger* event_logger,
int job_id, const Env::IOPriority io_priority,
TableProperties* table_properties, int level, const uint64_t creation_time,
const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint,
const uint64_t file_creation_time, const std::string& db_id,
@ -143,8 +144,9 @@ Status BuildTable(
file->SetWriteLifeTimeHint(write_hint);
file_writer.reset(new WritableFileWriter(
std::move(file), fname, file_options, env, ioptions.statistics,
ioptions.listeners, ioptions.file_checksum_gen_factory));
std::move(file), fname, file_options, env, io_tracer,
ioptions.statistics, ioptions.listeners,
ioptions.file_checksum_gen_factory));
builder = NewTableBuilder(
ioptions, mutable_cf_options, internal_comparator,

@ -79,7 +79,8 @@ extern Status BuildTable(
const uint64_t sample_for_compression,
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, TableFileCreationReason reason,
IOStatus* io_status, EventLogger* event_logger = nullptr, int job_id = 0,
IOStatus* io_status, const std::shared_ptr<IOTracer>& io_tracer,
EventLogger* event_logger = nullptr, int job_id = 0,
const Env::IOPriority io_priority = Env::IO_HIGH,
TableProperties* table_properties = nullptr, int level = -1,
const uint64_t creation_time = 0, const uint64_t oldest_key_time = 0,

@ -303,6 +303,7 @@ CompactionJob::CompactionJob(
db_options_(db_options),
file_options_(file_options),
env_(db_options.env),
io_tracer_(io_tracer),
fs_(db_options.fs, io_tracer),
file_options_for_read_(
fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
@ -1592,10 +1593,10 @@ Status CompactionJob::OpenCompactionOutputFile(
sub_compact->compaction->OutputFilePreallocationSize()));
const auto& listeners =
sub_compact->compaction->immutable_cf_options()->listeners;
sub_compact->outfile.reset(
new WritableFileWriter(std::move(writable_file), fname, file_options_,
env_, db_options_.statistics.get(), listeners,
db_options_.file_checksum_gen_factory.get()));
sub_compact->outfile.reset(new WritableFileWriter(
std::move(writable_file), fname, file_options_, env_, io_tracer_,
db_options_.statistics.get(), listeners,
db_options_.file_checksum_gen_factory.get()));
// If the Column family flag is to only optimize filters for hits,
// we can skip creating filters if this is the bottommost_level where

@ -158,6 +158,7 @@ class CompactionJob {
const FileOptions file_options_;
Env* env_;
std::shared_ptr<IOTracer> io_tracer_;
FileSystemPtr fs_;
// env_option optimized for compaction table reads
FileOptions file_options_for_read_;

@ -154,7 +154,7 @@ Status DBImpl::FlushMemTableToOutputFile(
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats,
true /* sync_output_directory */, true /* write_manifest */, thread_pri,
db_id_, db_session_id_);
io_tracer_, db_id_, db_session_id_);
FileMetaData file_meta;
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
@ -359,7 +359,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
false /* sync_output_directory */, false /* write_manifest */,
thread_pri, db_id_, db_session_id_));
thread_pri, io_tracer_, db_id_, db_session_id_));
jobs.back()->PickMemTable();
}

@ -290,8 +290,8 @@ Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
file->SetPreallocationBlockSize(
immutable_db_options_.manifest_preallocation_size);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(file), manifest, file_options, env_, nullptr /* stats */,
immutable_db_options_.listeners));
std::move(file), manifest, file_options, env_, io_tracer_,
nullptr /* stats */, immutable_db_options_.listeners));
log::Writer log(std::move(file_writer), 0, false);
std::string record;
new_db.EncodeTo(&record);
@ -1330,9 +1330,10 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
mutable_cf_options.sample_for_compression,
mutable_cf_options.compression_opts, paranoid_file_checks,
cfd->internal_stats(), TableFileCreationReason::kRecovery, &io_s,
&event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
-1 /* level */, current_time, 0 /* oldest_key_time */, write_hint,
0 /* file_creation_time */, db_id_, db_session_id_);
io_tracer_, &event_logger_, job_id, Env::IO_HIGH,
nullptr /* table_properties */, -1 /* level */, current_time,
0 /* oldest_key_time */, write_hint, 0 /* file_creation_time */,
db_id_, db_session_id_);
LogFlush(immutable_db_options_.info_log);
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery]"
@ -1436,9 +1437,9 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
lfile->SetPreallocationBlockSize(preallocate_block_size);
const auto& listeners = immutable_db_options_.listeners;
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(lfile), log_fname, opt_file_options,
env_, nullptr /* stats */, listeners));
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(lfile), log_fname, opt_file_options, env_, io_tracer_,
nullptr /* stats */, listeners));
*new_log = new log::Writer(std::move(file_writer), log_file_num,
immutable_db_options_.recycle_log_file_num > 0,
immutable_db_options_.manual_wal_flush);

@ -82,23 +82,21 @@ const char* GetFlushReasonString (FlushReason flush_reason) {
}
}
FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const uint64_t* max_memtable_id,
const FileOptions& file_options, VersionSet* versions,
InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, JobContext* job_context,
LogBuffer* log_buffer, FSDirectory* db_directory,
FSDirectory* output_file_directory,
CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri, const std::string& db_id,
const std::string& db_session_id)
FlushJob::FlushJob(
const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options, const uint64_t* max_memtable_id,
const FileOptions& file_options, VersionSet* versions,
InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, JobContext* job_context,
LogBuffer* log_buffer, FSDirectory* db_directory,
FSDirectory* output_file_directory, CompressionType output_compression,
Statistics* stats, EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_id, const std::string& db_session_id)
: dbname_(dbname),
db_id_(db_id),
db_session_id_(db_session_id),
@ -126,7 +124,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
edit_(nullptr),
base_(nullptr),
pick_memtable_called(false),
thread_pri_(thread_pri) {
thread_pri_(thread_pri),
io_tracer_(io_tracer) {
// Update the thread status to indicate flush.
ReportStartedFlush();
TEST_SYNC_POINT("FlushJob::FlushJob()");
@ -400,7 +399,7 @@ Status FlushJob::WriteLevel0Table() {
output_compression_, mutable_cf_options_.sample_for_compression,
mutable_cf_options_.compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, &io_s, event_logger_,
TableFileCreationReason::kFlush, &io_s, io_tracer_, event_logger_,
job_context_->job_id, Env::IO_HIGH, &table_properties_, 0 /* level */,
creation_time, oldest_key_time, write_hint, current_time, db_id_,
db_session_id_);

@ -72,7 +72,8 @@ class FlushJob {
CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri, const std::string& db_id = "",
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_id = "",
const std::string& db_session_id = "");
~FlushJob();
@ -161,6 +162,8 @@ class FlushJob {
bool pick_memtable_called;
Env::Priority thread_pri_;
IOStatus io_status_;
const std::shared_ptr<IOTracer> io_tracer_;
};
} // namespace ROCKSDB_NAMESPACE

@ -135,14 +135,14 @@ TEST_F(FlushJobTest, Empty) {
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
EventLogger event_logger(db_options_.info_log.get());
SnapshotChecker* snapshot_checker = nullptr; // not relavant
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
nullptr /* memtable_id */, env_options_, versions_.get(),
&mutex_, &shutting_down_, {}, kMaxSequenceNumber,
snapshot_checker, &job_context, nullptr, nullptr, nullptr,
kNoCompression, nullptr, &event_logger, false,
true /* sync_output_directory */,
true /* write_manifest */, Env::Priority::USER);
FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */,
env_options_, versions_.get(), &mutex_, &shutting_down_, {},
kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr,
nullptr, kNoCompression, nullptr, &event_logger, false,
true /* sync_output_directory */, true /* write_manifest */,
Env::Priority::USER, nullptr /*IOTracer*/);
{
InstrumentedMutexLock l(&mutex_);
flush_job.PickMemTable();
@ -216,14 +216,14 @@ TEST_F(FlushJobTest, NonEmpty) {
EventLogger event_logger(db_options_.info_log.get());
SnapshotChecker* snapshot_checker = nullptr; // not relavant
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
nullptr /* memtable_id */, env_options_, versions_.get(),
&mutex_, &shutting_down_, {}, kMaxSequenceNumber,
snapshot_checker, &job_context, nullptr, nullptr, nullptr,
kNoCompression, db_options_.statistics.get(),
&event_logger, true, true /* sync_output_directory */,
true /* write_manifest */, Env::Priority::USER);
FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */,
env_options_, versions_.get(), &mutex_, &shutting_down_, {},
kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr,
nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
true, true /* sync_output_directory */, true /* write_manifest */,
Env::Priority::USER, nullptr /*IOTracer*/);
HistogramData hist;
FileMetaData file_meta;
@ -278,14 +278,14 @@ TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) {
assert(memtable_ids.size() == num_mems);
uint64_t smallest_memtable_id = memtable_ids.front();
uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1;
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
&flush_memtable_id, env_options_, versions_.get(), &mutex_,
&shutting_down_, {}, kMaxSequenceNumber, snapshot_checker,
&job_context, nullptr, nullptr, nullptr, kNoCompression,
db_options_.statistics.get(), &event_logger, true,
true /* sync_output_directory */,
true /* write_manifest */, Env::Priority::USER);
FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), &flush_memtable_id, env_options_,
versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
db_options_.statistics.get(), &event_logger, true,
true /* sync_output_directory */, true /* write_manifest */,
Env::Priority::USER, nullptr /*IOTracer*/);
HistogramData hist;
FileMetaData file_meta;
mutex_.Lock();
@ -357,7 +357,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
&job_context, nullptr, nullptr, nullptr, kNoCompression,
db_options_.statistics.get(), &event_logger, true,
false /* sync_output_directory */, false /* write_manifest */,
Env::Priority::USER));
Env::Priority::USER, nullptr /*IOTracer*/));
k++;
}
HistogramData hist;
@ -466,14 +466,14 @@ TEST_F(FlushJobTest, Snapshots) {
EventLogger event_logger(db_options_.info_log.get());
SnapshotChecker* snapshot_checker = nullptr; // not relavant
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
nullptr /* memtable_id */, env_options_, versions_.get(),
&mutex_, &shutting_down_, snapshots, kMaxSequenceNumber,
snapshot_checker, &job_context, nullptr, nullptr, nullptr,
kNoCompression, db_options_.statistics.get(),
&event_logger, true, true /* sync_output_directory */,
true /* write_manifest */, Env::Priority::USER);
FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */,
env_options_, versions_.get(), &mutex_, &shutting_down_, snapshots,
kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr,
nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
true, true /* sync_output_directory */, true /* write_manifest */,
Env::Priority::USER, nullptr /*IOTracer*/);
mutex_.Lock();
flush_job.PickMemTable();
ASSERT_OK(flush_job.Run());

@ -436,11 +436,11 @@ class Repairer {
cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber,
snapshot_checker, kNoCompression, 0 /* sample_for_compression */,
CompressionOptions(), false, nullptr /* internal_stats */,
TableFileCreationReason::kRecovery, &io_s, nullptr /* event_logger */,
0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */,
-1 /* level */, current_time, 0 /* oldest_key_time */, write_hint,
0 /* file_creation_time */, "DB Repairer" /* db_id */,
db_session_id_);
TableFileCreationReason::kRecovery, &io_s, nullptr /*IOTracer*/,
nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH,
nullptr /* table_properties */, -1 /* level */, current_time,
0 /* oldest_key_time */, write_hint, 0 /* file_creation_time */,
"DB Repairer" /* db_id */, db_session_id_);
ROCKS_LOG_INFO(db_options_.info_log,
"Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
log, counter, meta.fd.GetNumber(),

@ -16,14 +16,16 @@ namespace ROCKSDB_NAMESPACE {
VersionEditHandler::VersionEditHandler(
bool read_only, const std::vector<ColumnFamilyDescriptor>& column_families,
VersionSet* version_set, bool track_missing_files,
bool no_error_if_table_files_missing)
bool no_error_if_table_files_missing,
const std::shared_ptr<IOTracer>& io_tracer)
: read_only_(read_only),
column_families_(column_families),
status_(),
version_set_(version_set),
track_missing_files_(track_missing_files),
no_error_if_table_files_missing_(no_error_if_table_files_missing),
initialized_(false) {
initialized_(false),
io_tracer_(io_tracer) {
assert(version_set_ != nullptr);
}
@ -390,7 +392,7 @@ Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/,
assert(builder_iter != builders_.end());
auto* builder = builder_iter->second->version_builder();
auto* v = new Version(cfd, version_set_, version_set_->file_options_,
*cfd->GetLatestMutableCFOptions(),
*cfd->GetLatestMutableCFOptions(), io_tracer_,
version_set_->current_version_number_++);
s = builder->SaveTo(v->storage_info());
if (s.ok()) {
@ -485,10 +487,11 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
VersionEditHandlerPointInTime::VersionEditHandlerPointInTime(
bool read_only, const std::vector<ColumnFamilyDescriptor>& column_families,
VersionSet* version_set)
VersionSet* version_set, const std::shared_ptr<IOTracer>& io_tracer)
: VersionEditHandler(read_only, column_families, version_set,
/*track_missing_files=*/true,
/*no_error_if_table_files_missing=*/true) {}
/*no_error_if_table_files_missing=*/true, io_tracer),
io_tracer_(io_tracer) {}
VersionEditHandlerPointInTime::~VersionEditHandlerPointInTime() {
for (const auto& elem : versions_) {
@ -573,7 +576,7 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
assert(builder_iter != builders_.end());
auto* builder = builder_iter->second->version_builder();
auto* version = new Version(cfd, version_set_, version_set_->file_options_,
*cfd->GetLatestMutableCFOptions(),
*cfd->GetLatestMutableCFOptions(), io_tracer_,
version_set_->current_version_number_++);
s = builder->SaveTo(version->storage_info());
if (s.ok()) {

@ -36,7 +36,7 @@ class VersionEditHandler {
bool read_only,
const std::vector<ColumnFamilyDescriptor>& column_families,
VersionSet* version_set, bool track_missing_files,
bool ignore_missing_files);
bool ignore_missing_files, const std::shared_ptr<IOTracer>& io_tracer);
virtual ~VersionEditHandler() {}
@ -95,6 +95,7 @@ class VersionEditHandler {
const VersionEdit& edit);
bool initialized_;
std::shared_ptr<IOTracer> io_tracer_;
};
// A class similar to its base class, i.e. VersionEditHandler.
@ -108,7 +109,7 @@ class VersionEditHandlerPointInTime : public VersionEditHandler {
VersionEditHandlerPointInTime(
bool read_only,
const std::vector<ColumnFamilyDescriptor>& column_families,
VersionSet* version_set);
VersionSet* version_set, const std::shared_ptr<IOTracer>& io_tracer);
~VersionEditHandlerPointInTime() override;
protected:
@ -119,6 +120,7 @@ class VersionEditHandlerPointInTime : public VersionEditHandler {
private:
std::unordered_map<uint32_t, Version*> versions_;
std::shared_ptr<IOTracer> io_tracer_;
};
} // namespace ROCKSDB_NAMESPACE

@ -1284,7 +1284,7 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
// pass the magic number check in the footer.
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(
std::move(file), file_name, nullptr /* env */, nullptr /* IOTracer */,
std::move(file), file_name, nullptr /* env */, io_tracer_,
nullptr /* stats */, 0 /* hist_type */, nullptr /* file_read_hist */,
nullptr /* rate_limiter */, ioptions->listeners));
s = ReadTableProperties(
@ -1750,6 +1750,7 @@ VersionStorageInfo::VersionStorageInfo(
Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
const FileOptions& file_opt,
const MutableCFOptions mutable_cf_options,
const std::shared_ptr<IOTracer>& io_tracer,
uint64_t version_number)
: env_(vset->env_),
cfd_(column_family_data),
@ -1777,7 +1778,8 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
mutable_cf_options_(mutable_cf_options),
max_file_size_for_l0_meta_pin_(
MaxFileSizeForL0MetaPin(mutable_cf_options_)),
version_number_(version_number) {}
version_number_(version_number),
io_tracer_(io_tracer) {}
void Version::Get(const ReadOptions& read_options, const LookupKey& k,
PinnableSlice* value, std::string* timestamp, Status* status,
@ -3805,7 +3807,7 @@ Status VersionSet::ProcessManifestWrites(
}
if (version == nullptr) {
version = new Version(last_writer->cfd, this, file_options_,
last_writer->mutable_cf_options,
last_writer->mutable_cf_options, io_tracer_,
current_version_number_++);
versions.push_back(version);
mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
@ -3962,7 +3964,7 @@ Status VersionSet::ProcessManifestWrites(
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(descriptor_file), descriptor_fname, opt_file_opts, env_,
nullptr, db_options_->listeners));
io_tracer_, nullptr, db_options_->listeners));
descriptor_log_.reset(
new log::Writer(std::move(file_writer), 0, false));
s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get(),
@ -4687,7 +4689,7 @@ Status VersionSet::Recover(
}
Version* v = new Version(cfd, this, file_options_,
*cfd->GetLatestMutableCFOptions(),
*cfd->GetLatestMutableCFOptions(), io_tracer_,
current_version_number_++);
s = builder->SaveTo(v->storage_info());
if (!s.ok()) {
@ -4863,8 +4865,8 @@ Status VersionSet::TryRecoverFromOneManifest(
reporter.status = &s;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
/*checksum=*/true, /*log_num=*/0);
VersionEditHandlerPointInTime handler_pit(read_only, column_families,
const_cast<VersionSet*>(this));
VersionEditHandlerPointInTime handler_pit(
read_only, column_families, const_cast<VersionSet*>(this), io_tracer_);
handler_pit.Iterate(reader, &s, db_id);
@ -5242,7 +5244,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
auto builder = builders_iter->second->version_builder();
Version* v = new Version(cfd, this, file_options_,
*cfd->GetLatestMutableCFOptions(),
*cfd->GetLatestMutableCFOptions(), io_tracer_,
current_version_number_++);
s = builder->SaveTo(v->storage_info());
v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);
@ -5881,7 +5883,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily(
MutableCFOptions dummy_cf_options;
Version* dummy_versions =
new Version(nullptr, this, file_options_, dummy_cf_options);
new Version(nullptr, this, file_options_, dummy_cf_options, io_tracer_);
// Ref() dummy version once so that later we can call Unref() to delete it
// by avoiding calling "delete" explicitly (~Version is private)
dummy_versions->Ref();
@ -5890,7 +5892,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily(
cf_options);
Version* v = new Version(new_cfd, this, file_options_,
*new_cfd->GetLatestMutableCFOptions(),
*new_cfd->GetLatestMutableCFOptions(), io_tracer_,
current_version_number_++);
// Fill level target base information.
@ -6079,7 +6081,7 @@ Status ReactiveVersionSet::Recover(
auto* builder = builders_iter->second->version_builder();
Version* v = new Version(cfd, this, file_options_,
*cfd->GetLatestMutableCFOptions(),
*cfd->GetLatestMutableCFOptions(), io_tracer_,
current_version_number_++);
s = builder->SaveTo(v->storage_info());
@ -6314,7 +6316,7 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
if (s.ok()) {
auto version = new Version(cfd, this, file_options_,
*cfd->GetLatestMutableCFOptions(),
*cfd->GetLatestMutableCFOptions(), io_tracer_,
current_version_number_++);
s = builder->SaveTo(version->storage_info());
if (s.ok()) {

@ -815,9 +815,12 @@ class Version {
// A version number that uniquely represents this version. This is
// used for debugging and logging purposes only.
uint64_t version_number_;
std::shared_ptr<IOTracer> io_tracer_;
Version(ColumnFamilyData* cfd, VersionSet* vset, const FileOptions& file_opt,
MutableCFOptions mutable_cf_options, uint64_t version_number = 0);
MutableCFOptions mutable_cf_options,
const std::shared_ptr<IOTracer>& io_tracer,
uint64_t version_number = 0);
~Version();
@ -1193,7 +1196,7 @@ class VersionSet {
const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
Version* const version =
new Version(cfd, this, file_options_, mutable_cf_options);
new Version(cfd, this, file_options_, mutable_cf_options, io_tracer_);
constexpr bool update_stats = false;
version->PrepareApply(mutable_cf_options, update_stats);

@ -270,26 +270,38 @@ class FSWritableFileTracingWrapper : public FSWritableFileWrapper {
// FSWritableFileTracingWrapper when tracing is disabled.
class FSWritableFilePtr {
public:
FSWritableFilePtr(FSWritableFile* fs, std::shared_ptr<IOTracer> io_tracer)
: fs_(fs),
io_tracer_(io_tracer),
fs_tracer_(new FSWritableFileTracingWrapper(fs_, io_tracer_)) {}
explicit FSWritableFilePtr(FSWritableFile* fs)
: fs_(fs), io_tracer_(nullptr), fs_tracer_(nullptr) {}
FSWritableFilePtr(std::unique_ptr<FSWritableFile>&& fs,
const std::shared_ptr<IOTracer>& io_tracer)
: fs_(std::move(fs)), io_tracer_(io_tracer) {
fs_tracer_.reset(new FSWritableFileTracingWrapper(fs_.get(), io_tracer_));
}
FSWritableFile* operator->() const {
if (io_tracer_ && io_tracer_->is_tracing_enabled()) {
return fs_tracer_;
return fs_tracer_.get();
} else {
return fs_;
return fs_.get();
}
}
FSWritableFile* get() const {
if (io_tracer_ && io_tracer_->is_tracing_enabled()) {
return fs_tracer_.get();
} else {
return fs_.get();
}
}
void reset() {
fs_.reset();
fs_tracer_.reset();
io_tracer_ = nullptr;
}
private:
FSWritableFile* fs_;
std::unique_ptr<FSWritableFile> fs_;
std::shared_ptr<IOTracer> io_tracer_;
FSWritableFileTracingWrapper* fs_tracer_;
std::unique_ptr<FSWritableFileTracingWrapper> fs_tracer_;
};
// FSRandomRWFileTracingWrapper is a wrapper class above FSRandomRWFile that

@ -129,7 +129,7 @@ IOStatus WritableFileWriter::Close() {
// in __dtor, simply flushing is not enough
// Windows when pre-allocating does not fill with zeros
// also with unbuffered access we also set the end of data.
if (!writable_file_) {
if (writable_file_.get() == nullptr) {
return s;
}

@ -10,7 +10,9 @@
#pragma once
#include <atomic>
#include <string>
#include "db/version_edit.h"
#include "env/file_system_tracer.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/file_checksum.h"
@ -117,8 +119,8 @@ class WritableFileWriter {
bool ShouldNotifyListeners() const { return !listeners_.empty(); }
void UpdateFileChecksum(const Slice& data);
std::unique_ptr<FSWritableFile> writable_file_;
std::string file_name_;
FSWritableFilePtr writable_file_;
Env* env_;
AlignedBuffer buf_;
size_t max_buffer_size_;
@ -144,11 +146,12 @@ class WritableFileWriter {
WritableFileWriter(
std::unique_ptr<FSWritableFile>&& file, const std::string& _file_name,
const FileOptions& options, Env* env = nullptr,
const std::shared_ptr<IOTracer>& io_tracer = nullptr,
Statistics* stats = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {},
FileChecksumGenFactory* file_checksum_gen_factory = nullptr)
: writable_file_(std::move(file)),
file_name_(_file_name),
: file_name_(_file_name),
writable_file_(std::move(file), io_tracer),
env_(env),
buf_(),
max_buffer_size_(options.writable_file_max_buffer_size),

@ -255,8 +255,9 @@ Status SstFileWriter::Open(const std::string& file_path) {
0 /* file_creation_time */, "SST Writer" /* db_id */, db_session_id);
r->file_writer.reset(new WritableFileWriter(
NewLegacyWritableFileWrapper(std::move(sst_file)), file_path,
r->env_options, r->ioptions.env, nullptr /* stats */,
r->ioptions.listeners, r->ioptions.file_checksum_gen_factory));
r->env_options, r->ioptions.env, nullptr /* io_tracer */,
nullptr /* stats */, r->ioptions.listeners,
r->ioptions.file_checksum_gen_factory));
// TODO(tec) : If table_factory is using compressed block cache, we will
// be adding the external sst file blocks into it, which is wasteful.

Loading…
Cancel
Save