Store FSSequentialFilePtr object in SequenceFileReader (#7190)

Summary:
This diff contains following changes:
    1. Replace `FSSequentialFile` pointer with `FSSequentialFilePtr` object that wraps `FSSequentialFile` pointer in `SequenceFileReader`.

Objective: If tracing is enabled, `FSSequentialFilePtr` returns `FSSequentialFileTracingWrapper` pointer that includes all necessary information in `IORecord` and calls underlying FileSystem and invokes `IOTracer` to dump that record in a binary file. If tracing is disabled then, underlying `FileSystem` pointer is returned directly. `FSSequentialFilePtr` wrapper class is added to bypass the `FSSequentialFileTracingWrapper` when tracing is disabled.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7190

Test Plan:
make check -j64
          COMPILE_WITH_TSAN=1 make check -j64

Reviewed By: anand1976

Differential Revision: D23059616

Pulled By: akankshamahajan15

fbshipit-source-id: 1564b94dd1297cd0fbfe2ed5c9cc3e20f7395301
main
Akanksha Mahajan 4 years ago committed by Facebook GitHub Bot
parent e6e2f3699c
commit cc24ac14eb
  1. 3
      db/db_impl/db_impl_open.cc
  2. 3
      db/db_impl/db_impl_secondary.cc
  3. 2
      db/external_sst_file_ingestion_job.cc
  4. 4
      db/external_sst_file_ingestion_job.h
  5. 2
      db/import_column_family_job.cc
  6. 4
      db/import_column_family_job.h
  7. 3
      db/transaction_log_impl.cc
  8. 15
      db/version_set.cc
  9. 2
      db/wal_manager.cc
  10. 27
      env/file_system_tracer.h
  11. 7
      file/file_util.cc
  12. 4
      file/file_util.h
  13. 23
      file/sequence_file_reader.h

@ -861,7 +861,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
} }
} }
file_reader.reset(new SequentialFileReader( file_reader.reset(new SequentialFileReader(
std::move(file), fname, immutable_db_options_.log_readahead_size)); std::move(file), fname, immutable_db_options_.log_readahead_size,
io_tracer_));
} }
// Create the log reader. // Create the log reader.

@ -153,7 +153,8 @@ Status DBImplSecondary::MaybeInitLogReader(
return status; return status;
} }
file_reader.reset(new SequentialFileReader( file_reader.reset(new SequentialFileReader(
std::move(file), fname, immutable_db_options_.log_readahead_size)); std::move(file), fname, immutable_db_options_.log_readahead_size,
io_tracer_));
} }
// Create the log reader. // Create the log reader.

@ -137,7 +137,7 @@ Status ExternalSstFileIngestionJob::Prepare(
nullptr); nullptr);
// CopyFile also sync the new file. // CopyFile also sync the new file.
status = CopyFile(fs_.get(), path_outside_db, path_inside_db, 0, status = CopyFile(fs_.get(), path_outside_db, path_inside_db, 0,
db_options_.use_fsync); db_options_.use_fsync, io_tracer_);
} }
TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded"); TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded");
if (!status.ok()) { if (!status.ok()) {

@ -90,7 +90,8 @@ class ExternalSstFileIngestionJob {
directories_(directories), directories_(directories),
event_logger_(event_logger), event_logger_(event_logger),
job_start_time_(env_->NowMicros()), job_start_time_(env_->NowMicros()),
consumed_seqno_count_(0) { consumed_seqno_count_(0),
io_tracer_(io_tracer) {
assert(directories != nullptr); assert(directories != nullptr);
} }
@ -188,6 +189,7 @@ class ExternalSstFileIngestionJob {
// Set in ExternalSstFileIngestionJob::Prepare(), if true and DB // Set in ExternalSstFileIngestionJob::Prepare(), if true and DB
// file_checksum_gen_factory is set, DB will generate checksum each file. // file_checksum_gen_factory is set, DB will generate checksum each file.
bool need_generate_file_checksum_{true}; bool need_generate_file_checksum_{true};
std::shared_ptr<IOTracer> io_tracer_;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -101,7 +101,7 @@ Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number,
} }
if (!hardlink_files) { if (!hardlink_files) {
status = CopyFile(fs_.get(), path_outside_db, path_inside_db, 0, status = CopyFile(fs_.get(), path_outside_db, path_inside_db, 0,
db_options_.use_fsync); db_options_.use_fsync, io_tracer_);
} }
if (!status.ok()) { if (!status.ok()) {
break; break;

@ -33,7 +33,8 @@ class ImportColumnFamilyJob {
fs_(db_options_.fs, io_tracer), fs_(db_options_.fs, io_tracer),
env_options_(env_options), env_options_(env_options),
import_options_(import_options), import_options_(import_options),
metadata_(metadata) {} metadata_(metadata),
io_tracer_(io_tracer) {}
// Prepare the job by copying external files into the DB. // Prepare the job by copying external files into the DB.
Status Prepare(uint64_t next_file_number, SuperVersion* sv); Status Prepare(uint64_t next_file_number, SuperVersion* sv);
@ -68,6 +69,7 @@ class ImportColumnFamilyJob {
VersionEdit edit_; VersionEdit edit_;
const ImportColumnFamilyOptions& import_options_; const ImportColumnFamilyOptions& import_options_;
std::vector<LiveFileMetaData> metadata_; std::vector<LiveFileMetaData> metadata_;
const std::shared_ptr<IOTracer> io_tracer_;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -63,7 +63,8 @@ Status TransactionLogIteratorImpl::OpenLogFile(
} }
} }
if (s.ok()) { if (s.ok()) {
file_reader->reset(new SequentialFileReader(std::move(file), fname)); file_reader->reset(
new SequentialFileReader(std::move(file), fname, io_tracer_));
} }
return s; return s;
} }

@ -4558,7 +4558,7 @@ Status VersionSet::Recover(
} }
manifest_file_reader.reset( manifest_file_reader.reset(
new SequentialFileReader(std::move(manifest_file), manifest_path, new SequentialFileReader(std::move(manifest_file), manifest_path,
db_options_->log_readahead_size)); db_options_->log_readahead_size, io_tracer_));
} }
VersionBuilderMap builders; VersionBuilderMap builders;
@ -4840,7 +4840,7 @@ Status VersionSet::TryRecoverFromOneManifest(
} }
manifest_file_reader.reset( manifest_file_reader.reset(
new SequentialFileReader(std::move(manifest_file), manifest_path, new SequentialFileReader(std::move(manifest_file), manifest_path,
db_options_->log_readahead_size)); db_options_->log_readahead_size, io_tracer_));
} }
assert(s.ok()); assert(s.ok());
@ -4881,7 +4881,8 @@ Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
file_reader.reset(new SequentialFileReader(std::move(file), manifest_path)); file_reader.reset(new SequentialFileReader(std::move(file), manifest_path,
nullptr /*IOTracer*/));
} }
std::map<uint32_t, std::string> column_family_names; std::map<uint32_t, std::string> column_family_names;
@ -5070,7 +5071,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
return s; return s;
} }
file_reader.reset(new SequentialFileReader( file_reader.reset(new SequentialFileReader(
std::move(file), dscname, db_options_->log_readahead_size)); std::move(file), dscname, db_options_->log_readahead_size, io_tracer_));
} }
bool have_prev_log_number = false; bool have_prev_log_number = false;
@ -6368,9 +6369,9 @@ Status ReactiveVersionSet::MaybeSwitchManifest(
} }
std::unique_ptr<SequentialFileReader> manifest_file_reader; std::unique_ptr<SequentialFileReader> manifest_file_reader;
if (s.ok()) { if (s.ok()) {
manifest_file_reader.reset( manifest_file_reader.reset(new SequentialFileReader(
new SequentialFileReader(std::move(manifest_file), manifest_path, std::move(manifest_file), manifest_path,
db_options_->log_readahead_size)); db_options_->log_readahead_size, io_tracer_));
manifest_reader->reset(new log::FragmentBufferedReader( manifest_reader->reset(new log::FragmentBufferedReader(
nullptr, std::move(manifest_file_reader), reporter, nullptr, std::move(manifest_file_reader), reporter,
true /* checksum */, 0 /* log_number */)); true /* checksum */, 0 /* log_number */));

@ -467,7 +467,7 @@ Status WalManager::ReadFirstLine(const std::string& fname,
fs_->OptimizeForLogRead(file_options_), fs_->OptimizeForLogRead(file_options_),
&file, nullptr); &file, nullptr);
std::unique_ptr<SequentialFileReader> file_reader( std::unique_ptr<SequentialFileReader> file_reader(
new SequentialFileReader(std::move(file), fname)); new SequentialFileReader(std::move(file), fname, io_tracer_));
if (!status.ok()) { if (!status.ok()) {
return status; return status;

@ -131,26 +131,33 @@ class FSSequentialFileTracingWrapper : public FSSequentialFileWrapper {
// FSSequentialFileTracingWrapper when tracing is disabled. // FSSequentialFileTracingWrapper when tracing is disabled.
class FSSequentialFilePtr { class FSSequentialFilePtr {
public: public:
FSSequentialFilePtr(FSSequentialFile* fs, std::shared_ptr<IOTracer> io_tracer) FSSequentialFilePtr() {}
: fs_(fs), io_tracer_(io_tracer) { FSSequentialFilePtr(std::unique_ptr<FSSequentialFile>&& fs,
fs_tracer_ = new FSSequentialFileTracingWrapper(fs_, io_tracer_); const std::shared_ptr<IOTracer>& io_tracer)
: fs_(std::move(fs)), io_tracer_(io_tracer) {
fs_tracer_.reset(new FSSequentialFileTracingWrapper(fs_.get(), io_tracer_));
} }
explicit FSSequentialFilePtr(FSSequentialFile* fs)
: fs_(fs), io_tracer_(nullptr), fs_tracer_(nullptr) {}
FSSequentialFile* operator->() const { FSSequentialFile* operator->() const {
if (io_tracer_ && io_tracer_->is_tracing_enabled()) { if (io_tracer_ && io_tracer_->is_tracing_enabled()) {
return fs_tracer_; return fs_tracer_.get();
} else { } else {
return fs_; return fs_.get();
}
}
FSSequentialFile* get() const {
if (io_tracer_ && io_tracer_->is_tracing_enabled()) {
return fs_tracer_.get();
} else {
return fs_.get();
} }
} }
private: private:
FSSequentialFile* fs_; std::unique_ptr<FSSequentialFile> fs_;
std::shared_ptr<IOTracer> io_tracer_; std::shared_ptr<IOTracer> io_tracer_;
FSSequentialFileTracingWrapper* fs_tracer_; std::unique_ptr<FSSequentialFileTracingWrapper> fs_tracer_;
}; };
// FSRandomAccessFileTracingWrapper is a wrapper class above FSRandomAccessFile // FSRandomAccessFileTracingWrapper is a wrapper class above FSRandomAccessFile

@ -18,8 +18,8 @@ namespace ROCKSDB_NAMESPACE {
// Utility function to copy a file up to a specified length // Utility function to copy a file up to a specified length
IOStatus CopyFile(FileSystem* fs, const std::string& source, IOStatus CopyFile(FileSystem* fs, const std::string& source,
const std::string& destination, uint64_t size, const std::string& destination, uint64_t size, bool use_fsync,
bool use_fsync) { const std::shared_ptr<IOTracer>& io_tracer) {
const FileOptions soptions; const FileOptions soptions;
IOStatus io_s; IOStatus io_s;
std::unique_ptr<SequentialFileReader> src_reader; std::unique_ptr<SequentialFileReader> src_reader;
@ -44,7 +44,8 @@ IOStatus CopyFile(FileSystem* fs, const std::string& source,
return io_s; return io_s;
} }
} }
src_reader.reset(new SequentialFileReader(std::move(srcfile), source)); src_reader.reset(
new SequentialFileReader(std::move(srcfile), source, io_tracer));
dest_writer.reset( dest_writer.reset(
new WritableFileWriter(std::move(destfile), destination, soptions)); new WritableFileWriter(std::move(destfile), destination, soptions));
} }

@ -13,13 +13,15 @@
#include "rocksdb/sst_file_writer.h" #include "rocksdb/sst_file_writer.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/types.h" #include "rocksdb/types.h"
#include "trace_replay/io_tracer.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
// use_fsync maps to options.use_fsync, which determines the way that // use_fsync maps to options.use_fsync, which determines the way that
// the file is synced after copying. // the file is synced after copying.
extern IOStatus CopyFile(FileSystem* fs, const std::string& source, extern IOStatus CopyFile(FileSystem* fs, const std::string& source,
const std::string& destination, uint64_t size, const std::string& destination, uint64_t size,
bool use_fsync); bool use_fsync,
const std::shared_ptr<IOTracer>& io_tracer = nullptr);
extern IOStatus CreateFile(FileSystem* fs, const std::string& destination, extern IOStatus CreateFile(FileSystem* fs, const std::string& destination,
const std::string& contents, bool use_fsync); const std::string& contents, bool use_fsync);

@ -10,6 +10,8 @@
#pragma once #pragma once
#include <atomic> #include <atomic>
#include <string> #include <string>
#include "env/file_system_tracer.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/file_system.h" #include "rocksdb/file_system.h"
@ -21,20 +23,23 @@ namespace ROCKSDB_NAMESPACE {
// cache disabled) reads appropriately, and also updates the IO stats. // cache disabled) reads appropriately, and also updates the IO stats.
class SequentialFileReader { class SequentialFileReader {
private: private:
std::unique_ptr<FSSequentialFile> file_;
std::string file_name_; std::string file_name_;
FSSequentialFilePtr file_;
std::atomic<size_t> offset_{0}; // read offset std::atomic<size_t> offset_{0}; // read offset
public: public:
explicit SequentialFileReader(std::unique_ptr<FSSequentialFile>&& _file, explicit SequentialFileReader(
const std::string& _file_name) std::unique_ptr<FSSequentialFile>&& _file, const std::string& _file_name,
: file_(std::move(_file)), file_name_(_file_name) {} const std::shared_ptr<IOTracer>& io_tracer = nullptr)
: file_name_(_file_name), file_(std::move(_file), io_tracer) {}
explicit SequentialFileReader(std::unique_ptr<FSSequentialFile>&& _file, explicit SequentialFileReader(
const std::string& _file_name, std::unique_ptr<FSSequentialFile>&& _file, const std::string& _file_name,
size_t _readahead_size) size_t _readahead_size,
: file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size)), const std::shared_ptr<IOTracer>& io_tracer = nullptr)
file_name_(_file_name) {} : file_name_(_file_name),
file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size),
io_tracer) {}
SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o); *this = std::move(o);

Loading…
Cancel
Save