Add file name info to SequentialFileReader. (#4026)

Summary:
We potentially need this information for tracing, profiling and diagnosis.
Closes https://github.com/facebook/rocksdb/pull/4026

Differential Revision: D8555214

Pulled By: riversand963

fbshipit-source-id: 4263e06c00b6d5410b46aa46eb4e358ff2161dd2
main
Yanqin Jin 7 years ago committed by Facebook Github Bot
parent 14cee194d6
commit 524c6e6b72
  1. 3
      db/db_impl.cc
  2. 2
      db/db_impl_open.cc
  3. 12
      db/log_test.cc
  4. 2
      db/repair.cc
  5. 7
      db/transaction_log_impl.cc
  6. 6
      db/version_set.cc
  7. 2
      db/wal_manager.cc
  8. 6
      port/win/win_jemalloc.cc
  9. 3
      tools/ldb_cmd.cc
  10. 8
      util/file_reader_writer.h
  11. 2
      util/file_util.cc
  12. 5
      util/testutil.cc
  13. 3
      util/testutil.h
  14. 6
      utilities/backupable/backupable_db.cc
  15. 5
      utilities/blob_db/blob_file.cc

@ -2368,7 +2368,8 @@ Status DBImpl::GetDbIdentity(std::string& identity) const {
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
id_file_reader.reset(new SequentialFileReader(std::move(idfile))); id_file_reader.reset(
new SequentialFileReader(std::move(idfile), idfilename));
} }
uint64_t file_size; uint64_t file_size;

@ -577,7 +577,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
continue; continue;
} }
} }
file_reader.reset(new SequentialFileReader(std::move(file))); file_reader.reset(new SequentialFileReader(std::move(file), fname));
} }
// Create the log reader. // Create the log reader.

@ -160,8 +160,8 @@ class LogTest : public ::testing::TestWithParam<int> {
: reader_contents_(), : reader_contents_(),
dest_holder_(test::GetWritableFileWriter( dest_holder_(test::GetWritableFileWriter(
new test::StringSink(&reader_contents_))), new test::StringSink(&reader_contents_))),
source_holder_( source_holder_(test::GetSequentialFileReader(
test::GetSequentialFileReader(new StringSource(reader_contents_))), new StringSource(reader_contents_), "" /* file name */)),
writer_(std::move(dest_holder_), 123, GetParam()), writer_(std::move(dest_holder_), 123, GetParam()),
reader_(nullptr, std::move(source_holder_), &report_, true /*checksum*/, reader_(nullptr, std::move(source_holder_), &report_, true /*checksum*/,
0 /*initial_offset*/, 123) { 0 /*initial_offset*/, 123) {
@ -268,8 +268,8 @@ class LogTest : public ::testing::TestWithParam<int> {
void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) { void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
WriteInitialOffsetLog(); WriteInitialOffsetLog();
unique_ptr<SequentialFileReader> file_reader( unique_ptr<SequentialFileReader> file_reader(test::GetSequentialFileReader(
test::GetSequentialFileReader(new StringSource(reader_contents_))); new StringSource(reader_contents_), "" /* fname */));
unique_ptr<Reader> offset_reader( unique_ptr<Reader> offset_reader(
new Reader(nullptr, std::move(file_reader), &report_, new Reader(nullptr, std::move(file_reader), &report_,
true /*checksum*/, WrittenBytes() + offset_past_end, 123)); true /*checksum*/, WrittenBytes() + offset_past_end, 123));
@ -281,8 +281,8 @@ class LogTest : public ::testing::TestWithParam<int> {
void CheckInitialOffsetRecord(uint64_t initial_offset, void CheckInitialOffsetRecord(uint64_t initial_offset,
int expected_record_offset) { int expected_record_offset) {
WriteInitialOffsetLog(); WriteInitialOffsetLog();
unique_ptr<SequentialFileReader> file_reader( unique_ptr<SequentialFileReader> file_reader(test::GetSequentialFileReader(
test::GetSequentialFileReader(new StringSource(reader_contents_))); new StringSource(reader_contents_), "" /* fname */));
unique_ptr<Reader> offset_reader( unique_ptr<Reader> offset_reader(
new Reader(nullptr, std::move(file_reader), &report_, new Reader(nullptr, std::move(file_reader), &report_,
true /*checksum*/, initial_offset, 123)); true /*checksum*/, initial_offset, 123));

@ -341,7 +341,7 @@ class Repairer {
return status; return status;
} }
unique_ptr<SequentialFileReader> lfile_reader( unique_ptr<SequentialFileReader> lfile_reader(
new SequentialFileReader(std::move(lfile))); new SequentialFileReader(std::move(lfile), logname));
// Create the log reader. // Create the log reader.
LogReporter reporter; LogReporter reporter;

@ -46,13 +46,14 @@ Status TransactionLogIteratorImpl::OpenLogFile(
const LogFile* logFile, unique_ptr<SequentialFileReader>* file_reader) { const LogFile* logFile, unique_ptr<SequentialFileReader>* file_reader) {
Env* env = options_->env; Env* env = options_->env;
unique_ptr<SequentialFile> file; unique_ptr<SequentialFile> file;
std::string fname;
Status s; Status s;
EnvOptions optimized_env_options = env->OptimizeForLogRead(soptions_); EnvOptions optimized_env_options = env->OptimizeForLogRead(soptions_);
if (logFile->Type() == kArchivedLogFile) { if (logFile->Type() == kArchivedLogFile) {
std::string fname = ArchivedLogFileName(dir_, logFile->LogNumber()); fname = ArchivedLogFileName(dir_, logFile->LogNumber());
s = env->NewSequentialFile(fname, &file, optimized_env_options); s = env->NewSequentialFile(fname, &file, optimized_env_options);
} else { } else {
std::string fname = LogFileName(dir_, logFile->LogNumber()); fname = LogFileName(dir_, logFile->LogNumber());
s = env->NewSequentialFile(fname, &file, optimized_env_options); s = env->NewSequentialFile(fname, &file, optimized_env_options);
if (!s.ok()) { if (!s.ok()) {
// If cannot open file in DB directory. // If cannot open file in DB directory.
@ -62,7 +63,7 @@ Status TransactionLogIteratorImpl::OpenLogFile(
} }
} }
if (s.ok()) { if (s.ok()) {
file_reader->reset(new SequentialFileReader(std::move(file))); file_reader->reset(new SequentialFileReader(std::move(file), fname));
} }
return s; return s;
} }

@ -3106,7 +3106,7 @@ Status VersionSet::Recover(
return s; return s;
} }
manifest_file_reader.reset( manifest_file_reader.reset(
new SequentialFileReader(std::move(manifest_file))); new SequentialFileReader(std::move(manifest_file), manifest_filename));
} }
uint64_t current_manifest_file_size; uint64_t current_manifest_file_size;
s = env_->GetFileSize(manifest_filename, &current_manifest_file_size); s = env_->GetFileSize(manifest_filename, &current_manifest_file_size);
@ -3416,7 +3416,7 @@ Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
file_reader.reset(new SequentialFileReader(std::move(file))); file_reader.reset(new SequentialFileReader(std::move(file), dscname));
} }
std::map<uint32_t, std::string> column_family_names; std::map<uint32_t, std::string> column_family_names;
@ -3560,7 +3560,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
file_reader.reset(new SequentialFileReader(std::move(file))); file_reader.reset(new SequentialFileReader(std::move(file), dscname));
} }
bool have_prev_log_number = false; bool have_prev_log_number = false;

@ -444,7 +444,7 @@ Status WalManager::ReadFirstLine(const std::string& fname,
Status status = env_->NewSequentialFile( Status status = env_->NewSequentialFile(
fname, &file, env_->OptimizeForLogRead(env_options_)); fname, &file, env_->OptimizeForLogRead(env_options_));
unique_ptr<SequentialFileReader> file_reader( unique_ptr<SequentialFileReader> file_reader(
new SequentialFileReader(std::move(file))); new SequentialFileReader(std::move(file), fname));
if (!status.ok()) { if (!status.ok()) {
return status; return status;

@ -37,12 +37,10 @@ ZSTD_customMem GetJeZstdAllocationOverrides() {
// Global operators to be replaced by a linker when this file is // Global operators to be replaced by a linker when this file is
// a part of the build // a part of the build
void* jemalloc_aligned_alloc( size_t size, size_t alignment) ROCKSDB_NOEXCEPT { void* jemalloc_aligned_alloc(size_t size, size_t alignment) ROCKSDB_NOEXCEPT {
return je_aligned_alloc(alignment, size); return je_aligned_alloc(alignment, size);
} }
void jemalloc_aligned_free(void* p) ROCKSDB_NOEXCEPT { void jemalloc_aligned_free(void* p) ROCKSDB_NOEXCEPT { je_free(p); }
je_free(p);
}
void* operator new(size_t size) { void* operator new(size_t size) {
void* p = je_malloc(size); void* p = je_malloc(size);

@ -1962,7 +1962,8 @@ void DumpWalFile(std::string wal_file, bool print_header, bool print_values,
unique_ptr<SequentialFile> file; unique_ptr<SequentialFile> file;
status = env_->NewSequentialFile(wal_file, &file, soptions); status = env_->NewSequentialFile(wal_file, &file, soptions);
if (status.ok()) { if (status.ok()) {
wal_file_reader.reset(new SequentialFileReader(std::move(file))); wal_file_reader.reset(
new SequentialFileReader(std::move(file), wal_file));
} }
} }
if (!status.ok()) { if (!status.ok()) {

@ -26,11 +26,13 @@ std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
class SequentialFileReader { class SequentialFileReader {
private: private:
std::unique_ptr<SequentialFile> file_; std::unique_ptr<SequentialFile> file_;
std::string file_name_;
std::atomic<size_t> offset_; // read offset std::atomic<size_t> offset_; // read offset
public: public:
explicit SequentialFileReader(std::unique_ptr<SequentialFile>&& _file) explicit SequentialFileReader(std::unique_ptr<SequentialFile>&& _file,
: file_(std::move(_file)), offset_(0) {} const std::string& _file_name)
: file_(std::move(_file)), file_name_(_file_name), offset_(0) {}
SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o); *this = std::move(o);
@ -52,6 +54,8 @@ class SequentialFileReader {
SequentialFile* file() { return file_.get(); } SequentialFile* file() { return file_.get(); }
std::string file_name() { return file_name_; }
bool use_direct_io() const { return file_->use_direct_io(); } bool use_direct_io() const { return file_->use_direct_io(); }
}; };

@ -41,7 +41,7 @@ Status CopyFile(Env* env, const std::string& source,
return s; return s;
} }
} }
src_reader.reset(new SequentialFileReader(std::move(srcfile))); src_reader.reset(new SequentialFileReader(std::move(srcfile), source));
dest_writer.reset(new WritableFileWriter(std::move(destfile), soptions)); dest_writer.reset(new WritableFileWriter(std::move(destfile), soptions));
} }

@ -135,9 +135,10 @@ RandomAccessFileReader* GetRandomAccessFileReader(RandomAccessFile* raf) {
"[test RandomAccessFileReader]"); "[test RandomAccessFileReader]");
} }
SequentialFileReader* GetSequentialFileReader(SequentialFile* se) { SequentialFileReader* GetSequentialFileReader(SequentialFile* se,
const std::string& fname) {
unique_ptr<SequentialFile> file(se); unique_ptr<SequentialFile> file(se);
return new SequentialFileReader(std::move(file)); return new SequentialFileReader(std::move(file), fname);
} }
void CorruptKeyType(InternalKey* ikey) { void CorruptKeyType(InternalKey* ikey) {

@ -187,7 +187,8 @@ extern WritableFileWriter* GetWritableFileWriter(WritableFile* wf);
extern RandomAccessFileReader* GetRandomAccessFileReader(RandomAccessFile* raf); extern RandomAccessFileReader* GetRandomAccessFileReader(RandomAccessFile* raf);
extern SequentialFileReader* GetSequentialFileReader(SequentialFile* se); extern SequentialFileReader* GetSequentialFileReader(SequentialFile* se,
const std::string& fname);
class StringSink: public WritableFile { class StringSink: public WritableFile {
public: public:

@ -1218,7 +1218,7 @@ Status BackupEngineImpl::CopyOrCreateFile(
unique_ptr<SequentialFileReader> src_reader; unique_ptr<SequentialFileReader> src_reader;
unique_ptr<char[]> buf; unique_ptr<char[]> buf;
if (!src.empty()) { if (!src.empty()) {
src_reader.reset(new SequentialFileReader(std::move(src_file))); src_reader.reset(new SequentialFileReader(std::move(src_file), src));
buf.reset(new char[copy_file_buffer_size_]); buf.reset(new char[copy_file_buffer_size_]);
} }
@ -1417,7 +1417,7 @@ Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env,
} }
unique_ptr<SequentialFileReader> src_reader( unique_ptr<SequentialFileReader> src_reader(
new SequentialFileReader(std::move(src_file))); new SequentialFileReader(std::move(src_file), src));
std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]); std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
Slice data; Slice data;
@ -1641,7 +1641,7 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
} }
unique_ptr<SequentialFileReader> backup_meta_reader( unique_ptr<SequentialFileReader> backup_meta_reader(
new SequentialFileReader(std::move(backup_meta_file))); new SequentialFileReader(std::move(backup_meta_file), meta_filename_));
unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]); unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]);
Slice data; Slice data;
s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get()); s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get());

@ -84,14 +84,15 @@ std::shared_ptr<Reader> BlobFile::OpenSequentialReader(
Env* env, const DBOptions& db_options, Env* env, const DBOptions& db_options,
const EnvOptions& env_options) const { const EnvOptions& env_options) const {
std::unique_ptr<SequentialFile> sfile; std::unique_ptr<SequentialFile> sfile;
Status s = env->NewSequentialFile(PathName(), &sfile, env_options); std::string path_name(PathName());
Status s = env->NewSequentialFile(path_name, &sfile, env_options);
if (!s.ok()) { if (!s.ok()) {
// report something here. // report something here.
return nullptr; return nullptr;
} }
std::unique_ptr<SequentialFileReader> sfile_reader; std::unique_ptr<SequentialFileReader> sfile_reader;
sfile_reader.reset(new SequentialFileReader(std::move(sfile))); sfile_reader.reset(new SequentialFileReader(std::move(sfile), path_name));
std::shared_ptr<Reader> log_reader = std::make_shared<Reader>( std::shared_ptr<Reader> log_reader = std::make_shared<Reader>(
std::move(sfile_reader), db_options.env, db_options.statistics.get()); std::move(sfile_reader), db_options.env, db_options.statistics.get());

Loading…
Cancel
Save