From c1a65a4de4e53dac192f9c78771360db20eed9ed Mon Sep 17 00:00:00 2001 From: mrambacher Date: Mon, 4 Jan 2021 15:59:52 -0800 Subject: [PATCH] Make StringEnv, StringSink, StringSource use FS classes (#7786) Summary: Change the StringEnv and related classes to be based on FileSystem APIs rather than the corresponding Env ones. The StringSink and StringSource classes were changed to be based on the corresponding FS file classes. Part of a cleanup to use the newer interfaces. This change also eliminates some of the casts/wrappers to LegacyFile classes. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7786 Reviewed By: jay-zhuang Differential Revision: D25761460 Pulled By: anand1976 fbshipit-source-id: 428ae8e32b3db97dbeeca08c9d3bb0d9d4d3a38f --- db/log_test.cc | 149 +++--- db/plain_table_db_test.cc | 4 +- db/table_properties_collector_test.cc | 28 +- file/readahead_raf.cc | 43 +- file/readahead_raf.h | 10 +- options/options_test.cc | 56 +-- .../block_based/data_block_hash_index_test.cc | 19 +- table/table_test.cc | 106 ++-- test_util/testutil.cc | 19 - test_util/testutil.h | 462 ++++++++++-------- util/file_reader_writer_test.cc | 20 +- utilities/blob_db/blob_dump_tool.cc | 20 +- 12 files changed, 489 insertions(+), 447 deletions(-) diff --git a/db/log_test.cc b/db/log_test.cc index 8289fdb7e..2e993d8f9 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -9,7 +9,6 @@ #include "db/log_reader.h" #include "db/log_writer.h" -#include "env/composite_env_wrapper.h" #include "file/sequence_file_reader.h" #include "file/writable_file_writer.h" #include "rocksdb/env.h" @@ -50,7 +49,7 @@ static std::string RandomSkewedString(int i, Random* rnd) { // get<1>(tuple): true if allow retry after read EOF, false otherwise class LogTest : public ::testing::TestWithParam> { private: - class StringSource : public SequentialFile { + class StringSource : public FSSequentialFile { public: Slice& contents_; bool force_error_; @@ -68,7 +67,8 @@ class LogTest : public ::testing::TestWithParam> { returned_partial_(false), fail_after_read_partial_(fail_after_read_partial) {} - Status Read(size_t n, Slice* result, char* scratch) override { + IOStatus Read(size_t n, const IOOptions& /*opts*/, Slice* result, + char* scratch, IODebugContext* /*dbg*/) override { if (fail_after_read_partial_) { EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error"; } @@ -81,7 +81,7 @@ class LogTest : public ::testing::TestWithParam> { contents_.remove_prefix(force_error_position_); force_error_ = false; returned_partial_ = true; - return Status::Corruption("read error"); + return IOStatus::Corruption("read error"); } } @@ -106,28 +106,21 @@ class LogTest : public ::testing::TestWithParam> { *result = Slice(scratch, n); contents_.remove_prefix(n); - return Status::OK(); + return IOStatus::OK(); } - Status Skip(uint64_t n) override { + IOStatus Skip(uint64_t n) override { if (n > contents_.size()) { contents_.clear(); - return Status::NotFound("in-memory file skipepd past end"); + return IOStatus::NotFound("in-memory file skipepd past end"); } contents_.remove_prefix(n); - return Status::OK(); + return IOStatus::OK(); } }; - inline StringSource* GetStringSourceFromLegacyReader( - SequentialFileReader* reader) { - LegacySequentialFileWrapper* file = - static_cast(reader->file()); - return static_cast(file->target()); - } - class ReportCollector : public Reader::Reporter { public: size_t dropped_bytes_; @@ -140,29 +133,17 @@ class LogTest : public ::testing::TestWithParam> { } }; - std::string& dest_contents() { - auto dest = test::GetStringSinkFromLegacyWriter(writer_.file()); - assert(dest); - return dest->contents_; - } + std::string& dest_contents() { return sink_->contents_; } - const std::string& dest_contents() const { - auto dest = test::GetStringSinkFromLegacyWriter(writer_.file()); - assert(dest); - return dest->contents_; - } + const std::string& dest_contents() const { return sink_->contents_; } - void reset_source_contents() { - auto src = GetStringSourceFromLegacyReader(reader_->file()); - assert(src); - src->contents_ = dest_contents(); - } + void reset_source_contents() { source_->contents_ = dest_contents(); } Slice reader_contents_; - std::unique_ptr dest_holder_; - std::unique_ptr source_holder_; + test::StringSink* sink_; + StringSource* source_; ReportCollector report_; - Writer writer_; + std::unique_ptr writer_; std::unique_ptr reader_; protected: @@ -171,19 +152,23 @@ class LogTest : public ::testing::TestWithParam> { public: LogTest() : reader_contents_(), - dest_holder_(test::GetWritableFileWriter( - new test::StringSink(&reader_contents_), "" /* don't care */)), - source_holder_(test::GetSequentialFileReader( - new StringSource(reader_contents_, !std::get<1>(GetParam())), - "" /* file name */)), - writer_(std::move(dest_holder_), 123, std::get<0>(GetParam())), + sink_(new test::StringSink(&reader_contents_)), + source_(new StringSource(reader_contents_, !std::get<1>(GetParam()))), allow_retry_read_(std::get<1>(GetParam())) { + std::unique_ptr sink_holder(sink_); + std::unique_ptr file_writer(new WritableFileWriter( + std::move(sink_holder), "" /* don't care */, FileOptions())); + writer_.reset( + new Writer(std::move(file_writer), 123, std::get<0>(GetParam()))); + std::unique_ptr source_holder(source_); + std::unique_ptr file_reader( + new SequentialFileReader(std::move(source_holder), "" /* file name */)); if (allow_retry_read_) { - reader_.reset(new FragmentBufferedReader( - nullptr, std::move(source_holder_), &report_, true /* checksum */, - 123 /* log_number */)); + reader_.reset(new FragmentBufferedReader(nullptr, std::move(file_reader), + &report_, true /* checksum */, + 123 /* log_number */)); } else { - reader_.reset(new Reader(nullptr, std::move(source_holder_), &report_, + reader_.reset(new Reader(nullptr, std::move(file_reader), &report_, true /* checksum */, 123 /* log_number */)); } } @@ -191,7 +176,7 @@ class LogTest : public ::testing::TestWithParam> { Slice* get_reader_contents() { return &reader_contents_; } void Write(const std::string& msg) { - ASSERT_OK(writer_.AddRecord(Slice(msg))); + ASSERT_OK(writer_->AddRecord(Slice(msg))); } size_t WrittenBytes() const { @@ -219,11 +204,7 @@ class LogTest : public ::testing::TestWithParam> { dest_contents()[offset] = new_byte; } - void ShrinkSize(int bytes) { - auto dest = test::GetStringSinkFromLegacyWriter(writer_.file()); - assert(dest); - dest->Drop(bytes); - } + void ShrinkSize(int bytes) { sink_->Drop(bytes); } void FixChecksum(int header_offset, int len, bool recyclable) { // Compute crc of type/len/data @@ -235,9 +216,8 @@ class LogTest : public ::testing::TestWithParam> { } void ForceError(size_t position = 0) { - auto src = GetStringSourceFromLegacyReader(reader_->file()); - src->force_error_ = true; - src->force_error_position_ = position; + source_->force_error_ = true; + source_->force_error_position_ = position; } size_t DroppedBytes() const { @@ -249,14 +229,12 @@ class LogTest : public ::testing::TestWithParam> { } void ForceEOF(size_t position = 0) { - auto src = GetStringSourceFromLegacyReader(reader_->file()); - src->force_eof_ = true; - src->force_eof_position_ = position; + source_->force_eof_ = true; + source_->force_eof_position_ = position; } void UnmarkEOF() { - auto src = GetStringSourceFromLegacyReader(reader_->file()); - src->returned_partial_ = false; + source_->returned_partial_ = false; reader_->UnmarkEOF(); } @@ -685,9 +663,10 @@ TEST_P(LogTest, Recycle) { while (get_reader_contents()->size() < log::kBlockSize * 2) { Write("xxxxxxxxxxxxxxxx"); } - std::unique_ptr dest_holder(test::GetWritableFileWriter( - new test::OverwritingStringSink(get_reader_contents()), - "" /* don't care */)); + std::unique_ptr sink( + new test::OverwritingStringSink(get_reader_contents())); + std::unique_ptr dest_holder(new WritableFileWriter( + std::move(sink), "" /* don't care */, FileOptions())); Writer recycle_writer(std::move(dest_holder), 123, true); ASSERT_OK(recycle_writer.AddRecord(Slice("foooo"))); ASSERT_OK(recycle_writer.AddRecord(Slice("bar"))); @@ -718,10 +697,9 @@ class RetriableLogTest : public ::testing::TestWithParam { }; Slice contents_; - std::unique_ptr dest_holder_; + test::StringSink* sink_; std::unique_ptr log_writer_; Env* env_; - EnvOptions env_options_; const std::string test_dir_; const std::string log_file_; std::unique_ptr writer_; @@ -732,55 +710,50 @@ class RetriableLogTest : public ::testing::TestWithParam { public: RetriableLogTest() : contents_(), - dest_holder_(nullptr), + sink_(new test::StringSink(&contents_)), log_writer_(nullptr), env_(Env::Default()), test_dir_(test::PerThreadDBPath("retriable_log_test")), log_file_(test_dir_ + "/log"), writer_(nullptr), reader_(nullptr), - log_reader_(nullptr) {} + log_reader_(nullptr) { + std::unique_ptr sink_holder(sink_); + std::unique_ptr wfw(new WritableFileWriter( + std::move(sink_holder), "" /* file name */, FileOptions())); + log_writer_.reset(new Writer(std::move(wfw), 123, GetParam())); + } Status SetupTestEnv() { - dest_holder_.reset(test::GetWritableFileWriter( - new test::StringSink(&contents_), "" /* file name */)); - assert(dest_holder_ != nullptr); - log_writer_.reset(new Writer(std::move(dest_holder_), 123, GetParam())); - assert(log_writer_ != nullptr); - Status s; - s = env_->CreateDirIfMissing(test_dir_); - std::unique_ptr writable_file; + FileOptions fopts; + auto fs = env_->GetFileSystem(); + s = fs->CreateDirIfMissing(test_dir_, IOOptions(), nullptr); + std::unique_ptr writable_file; if (s.ok()) { - s = env_->NewWritableFile(log_file_, &writable_file, env_options_); + s = fs->NewWritableFile(log_file_, fopts, &writable_file, nullptr); } if (s.ok()) { - writer_.reset(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(writable_file)), log_file_, - env_options_)); - assert(writer_ != nullptr); + writer_.reset( + new WritableFileWriter(std::move(writable_file), log_file_, fopts)); + EXPECT_NE(writer_, nullptr); } - std::unique_ptr seq_file; + std::unique_ptr seq_file; if (s.ok()) { - s = env_->NewSequentialFile(log_file_, &seq_file, env_options_); + s = fs->NewSequentialFile(log_file_, fopts, &seq_file, nullptr); } if (s.ok()) { - reader_.reset(new SequentialFileReader( - NewLegacySequentialFileWrapper(seq_file), log_file_)); - assert(reader_ != nullptr); + reader_.reset(new SequentialFileReader(std::move(seq_file), log_file_)); + EXPECT_NE(reader_, nullptr); log_reader_.reset(new FragmentBufferedReader( nullptr, std::move(reader_), &report_, true /* checksum */, 123 /* log_number */)); - assert(log_reader_ != nullptr); + EXPECT_NE(log_reader_, nullptr); } return s; } - std::string contents() { - auto file = test::GetStringSinkFromLegacyWriter(log_writer_->file()); - assert(file != nullptr); - return file->contents_; - } + std::string contents() { return sink_->contents_; } void Encode(const std::string& msg) { ASSERT_OK(log_writer_->AddRecord(Slice(msg))); diff --git a/db/plain_table_db_test.cc b/db/plain_table_db_test.cc index 0ea7e9900..379e6b6b1 100644 --- a/db/plain_table_db_test.cc +++ b/db/plain_table_db_test.cc @@ -49,9 +49,9 @@ TEST_F(PlainTableKeyDecoderTest, ReadNonMmap) { Slice contents(tmp); test::StringSource* string_source = new test::StringSource(contents, 0, false); - + std::unique_ptr holder(string_source); std::unique_ptr file_reader( - test::GetRandomAccessFileReader(string_source)); + new RandomAccessFileReader(std::move(holder), "test")); std::unique_ptr file_info( new PlainTableReaderFileInfo(std::move(file_reader), EnvOptions(), kLength)); diff --git a/db/table_properties_collector_test.cc b/db/table_properties_collector_test.cc index 56d7edefe..4777bdd5e 100644 --- a/db/table_properties_collector_test.cc +++ b/db/table_properties_collector_test.cc @@ -13,7 +13,6 @@ #include "db/db_impl/db_impl.h" #include "db/dbformat.h" -#include "env/composite_env_wrapper.h" #include "file/sequence_file_reader.h" #include "file/writable_file_writer.h" #include "options/cf_options.h" @@ -49,10 +48,9 @@ void MakeBuilder(const Options& options, const ImmutableCFOptions& ioptions, int_tbl_prop_collector_factories, std::unique_ptr* writable, std::unique_ptr* builder) { - std::unique_ptr wf(new test::StringSink); + std::unique_ptr wf(new test::StringSink); writable->reset( - new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(wf)), - "" /* don't care */, EnvOptions())); + new WritableFileWriter(std::move(wf), "" /* don't care */, EnvOptions())); int unknown_level = -1; builder->reset(NewTableBuilder( ioptions, moptions, internal_comparator, int_tbl_prop_collector_factories, @@ -286,12 +284,13 @@ void TestCustomizedTablePropertiesCollector( writer->Flush(); // -- Step 2: Read properties - LegacyWritableFileWrapper* file = - static_cast(writer->writable_file()); - test::StringSink* fwf = static_cast(file->target()); + test::StringSink* fwf = + static_cast(writer->writable_file()); + std::unique_ptr source( + new test::StringSource(fwf->contents())); std::unique_ptr fake_file_reader( - test::GetRandomAccessFileReader( - new test::StringSource(fwf->contents()))); + new RandomAccessFileReader(std::move(source), "test")); + TableProperties* props; Status s = ReadTableProperties(fake_file_reader.get(), fwf->contents().size(), magic_number, ioptions, &props, @@ -427,12 +426,13 @@ void TestInternalKeyPropertiesCollector( ASSERT_OK(builder->Finish()); writable->Flush(); - LegacyWritableFileWrapper* file = - static_cast(writable->writable_file()); - test::StringSink* fwf = static_cast(file->target()); + test::StringSink* fwf = + static_cast(writable->writable_file()); + std::unique_ptr source( + new test::StringSource(fwf->contents())); std::unique_ptr reader( - test::GetRandomAccessFileReader( - new test::StringSource(fwf->contents()))); + new RandomAccessFileReader(std::move(source), "test")); + TableProperties* props; Status s = ReadTableProperties(reader.get(), fwf->contents().size(), magic_number, diff --git a/file/readahead_raf.cc b/file/readahead_raf.cc index 493f9d9e8..6d346432e 100644 --- a/file/readahead_raf.cc +++ b/file/readahead_raf.cc @@ -11,15 +11,17 @@ #include #include + #include "file/read_write_util.h" +#include "rocksdb/file_system.h" #include "util/aligned_buffer.h" #include "util/rate_limiter.h" namespace ROCKSDB_NAMESPACE { namespace { -class ReadaheadRandomAccessFile : public RandomAccessFile { +class ReadaheadRandomAccessFile : public FSRandomAccessFile { public: - ReadaheadRandomAccessFile(std::unique_ptr&& file, + ReadaheadRandomAccessFile(std::unique_ptr&& file, size_t readahead_size) : file_(std::move(file)), alignment_(file_->GetRequiredBufferAlignment()), @@ -35,11 +37,12 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) = delete; - Status Read(uint64_t offset, size_t n, Slice* result, - char* scratch) const override { + IOStatus Read(uint64_t offset, size_t n, const IOOptions& options, + Slice* result, char* scratch, + IODebugContext* dbg) const override { // Read-ahead only make sense if we have some slack left after reading if (n + alignment_ >= readahead_size_) { - return file_->Read(offset, n, result, scratch); + return file_->Read(offset, n, options, result, scratch, dbg); } std::unique_lock lk(lock_); @@ -53,14 +56,14 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { (cached_len == n || buffer_.CurrentSize() < readahead_size_)) { // We read exactly what we needed, or we hit end of file - return. *result = Slice(scratch, cached_len); - return Status::OK(); + return IOStatus::OK(); } size_t advanced_offset = static_cast(offset + cached_len); // In the case of cache hit advanced_offset is already aligned, means that // chunk_offset equals to advanced_offset size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset); - Status s = ReadIntoBuffer(chunk_offset, readahead_size_); + IOStatus s = ReadIntoBuffer(chunk_offset, readahead_size_, options, dbg); if (s.ok()) { // The data we need is now in cache, so we can safely read it size_t remaining_len; @@ -71,11 +74,12 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { return s; } - Status Prefetch(uint64_t offset, size_t n) override { + IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options, + IODebugContext* dbg) override { if (n < readahead_size_) { // Don't allow smaller prefetches than the configured `readahead_size_`. // `Read()` assumes a smaller prefetch buffer indicates EOF was reached. - return Status::OK(); + return IOStatus::OK(); } std::unique_lock lk(lock_); @@ -83,10 +87,11 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { size_t offset_ = static_cast(offset); size_t prefetch_offset = TruncateToPageBoundary(alignment_, offset_); if (prefetch_offset == buffer_offset_) { - return Status::OK(); + return IOStatus::OK(); } return ReadIntoBuffer(prefetch_offset, - Roundup(offset_ + n, alignment_) - prefetch_offset); + Roundup(offset_ + n, alignment_) - prefetch_offset, + options, dbg); } size_t GetUniqueId(char* id, size_t max_size) const override { @@ -95,7 +100,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { void Hint(AccessPattern pattern) override { file_->Hint(pattern); } - Status InvalidateCache(size_t offset, size_t length) override { + IOStatus InvalidateCache(size_t offset, size_t length) override { std::unique_lock lk(lock_); buffer_.Clear(); return file_->InvalidateCache(offset, length); @@ -125,14 +130,16 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { // Reads into buffer_ the next n bytes from file_ starting at offset. // Can actually read less if EOF was reached. // Returns the status of the read operastion on the file. - Status ReadIntoBuffer(uint64_t offset, size_t n) const { + IOStatus ReadIntoBuffer(uint64_t offset, size_t n, const IOOptions& options, + IODebugContext* dbg) const { if (n > buffer_.Capacity()) { n = buffer_.Capacity(); } assert(IsFileSectorAligned(offset, alignment_)); assert(IsFileSectorAligned(n, alignment_)); Slice result; - Status s = file_->Read(offset, n, &result, buffer_.BufferStart()); + IOStatus s = + file_->Read(offset, n, options, &result, buffer_.BufferStart(), dbg); if (s.ok()) { buffer_offset_ = offset; buffer_.Size(result.size()); @@ -141,7 +148,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { return s; } - const std::unique_ptr file_; + const std::unique_ptr file_; const size_t alignment_; const size_t readahead_size_; @@ -153,9 +160,9 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { }; } // namespace -std::unique_ptr NewReadaheadRandomAccessFile( - std::unique_ptr&& file, size_t readahead_size) { - std::unique_ptr result( +std::unique_ptr NewReadaheadRandomAccessFile( + std::unique_ptr&& file, size_t readahead_size) { + std::unique_ptr result( new ReadaheadRandomAccessFile(std::move(file), readahead_size)); return result; } diff --git a/file/readahead_raf.h b/file/readahead_raf.h index cbdcb124f..dfaf2b4fa 100644 --- a/file/readahead_raf.h +++ b/file/readahead_raf.h @@ -8,10 +8,12 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once -#include -#include "rocksdb/env.h" +#include + +#include "rocksdb/rocksdb_namespace.h" namespace ROCKSDB_NAMESPACE { +class FSRandomAccessFile; // This file provides the following main abstractions: // SequentialFileReader : wrapper over Env::SequentialFile // RandomAccessFileReader : wrapper over Env::RandomAccessFile @@ -22,6 +24,6 @@ namespace ROCKSDB_NAMESPACE { // NewReadaheadRandomAccessFile provides a wrapper over RandomAccessFile to // always prefetch additional data with every read. This is mainly used in // Compaction Table Readers. -std::unique_ptr NewReadaheadRandomAccessFile( - std::unique_ptr&& file, size_t readahead_size); +std::unique_ptr NewReadaheadRandomAccessFile( + std::unique_ptr&& file, size_t readahead_size); } // namespace ROCKSDB_NAMESPACE diff --git a/options/options_test.cc b/options/options_test.cc index a916e8568..b15be0206 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -2414,14 +2414,10 @@ TEST_F(OptionsOldApiTest, ColumnFamilyOptionsSerialization) { #ifndef ROCKSDB_LITE class OptionsParserTest : public testing::Test { public: - OptionsParserTest() { - env_.reset(new test::StringEnv(Env::Default())); - fs_.reset(new LegacyFileSystemWrapper(env_.get())); - } + OptionsParserTest() { fs_.reset(new test::StringFS(FileSystem::Default())); } protected: - std::unique_ptr env_; - std::unique_ptr fs_; + std::shared_ptr fs_; }; TEST_F(OptionsParserTest, Comment) { @@ -2450,7 +2446,7 @@ TEST_F(OptionsParserTest, Comment) { " # if a section is blank, we will use the default\n"; const std::string kTestFileName = "test-rocksdb-options.ini"; - ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content)); + ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content)); RocksDBOptionsParser parser; ASSERT_OK( parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); @@ -2481,7 +2477,7 @@ TEST_F(OptionsParserTest, ExtraSpace) { " # if a section is blank, we will use the default\n"; const std::string kTestFileName = "test-rocksdb-options.ini"; - ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content)); + ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content)); RocksDBOptionsParser parser; ASSERT_OK( parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); @@ -2499,7 +2495,7 @@ TEST_F(OptionsParserTest, MissingDBOptions) { " # if a section is blank, we will use the default\n"; const std::string kTestFileName = "test-rocksdb-options.ini"; - ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content)); + ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content)); RocksDBOptionsParser parser; ASSERT_NOK( parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); @@ -2529,7 +2525,7 @@ TEST_F(OptionsParserTest, DoubleDBOptions) { " # if a section is blank, we will use the default\n"; const std::string kTestFileName = "test-rocksdb-options.ini"; - ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content)); + ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content)); RocksDBOptionsParser parser; ASSERT_NOK( parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); @@ -2557,7 +2553,7 @@ TEST_F(OptionsParserTest, NoDefaultCFOptions) { " # if a section is blank, we will use the default\n"; const std::string kTestFileName = "test-rocksdb-options.ini"; - ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content)); + ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content)); RocksDBOptionsParser parser; ASSERT_NOK( parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); @@ -2587,7 +2583,7 @@ TEST_F(OptionsParserTest, DefaultCFOptionsMustBeTheFirst) { " # if a section is blank, we will use the default\n"; const std::string kTestFileName = "test-rocksdb-options.ini"; - ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content)); + ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content)); RocksDBOptionsParser parser; ASSERT_NOK( parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); @@ -2616,7 +2612,7 @@ TEST_F(OptionsParserTest, DuplicateCFOptions) { "[CFOptions \"something_else\"]\n"; const std::string kTestFileName = "test-rocksdb-options.ini"; - ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content)); + ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content)); RocksDBOptionsParser parser; ASSERT_NOK( parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); @@ -2684,12 +2680,12 @@ TEST_F(OptionsParserTest, IgnoreUnknownOptions) { " # if a section is blank, we will use the default\n"; const std::string kTestFileName = "test-rocksdb-options.ini"; - auto s = env_->FileExists(kTestFileName); + auto s = fs_->FileExists(kTestFileName, IOOptions(), nullptr); ASSERT_TRUE(s.ok() || s.IsNotFound()); if (s.ok()) { - ASSERT_OK(env_->DeleteFile(kTestFileName)); + ASSERT_OK(fs_->DeleteFile(kTestFileName, IOOptions(), nullptr)); } - ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content)); + ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content)); RocksDBOptionsParser parser; ASSERT_NOK(parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); @@ -2737,7 +2733,7 @@ TEST_F(OptionsParserTest, ParseVersion) { snprintf(buffer, kLength - 1, file_template.c_str(), iv.c_str()); parser.Reset(); - ASSERT_OK(env_->WriteToNewFile(iv, buffer)); + ASSERT_OK(fs_->WriteToNewFile(iv, buffer)); ASSERT_NOK(parser.Parse(iv, fs_.get(), false, 0 /* readahead_size */)); } @@ -2746,7 +2742,7 @@ TEST_F(OptionsParserTest, ParseVersion) { for (auto vv : valid_versions) { snprintf(buffer, kLength - 1, file_template.c_str(), vv.c_str()); parser.Reset(); - ASSERT_OK(env_->WriteToNewFile(vv, buffer)); + ASSERT_OK(fs_->WriteToNewFile(vv, buffer)); ASSERT_OK(parser.Parse(vv, fs_.get(), false, 0 /* readahead_size */)); } } @@ -2855,37 +2851,37 @@ TEST_F(OptionsParserTest, Readahead) { kOptionsFileName, fs_.get())); uint64_t file_size = 0; - ASSERT_OK(env_->GetFileSize(kOptionsFileName, &file_size)); + ASSERT_OK( + fs_->GetFileSize(kOptionsFileName, IOOptions(), &file_size, nullptr)); assert(file_size > 0); RocksDBOptionsParser parser; - env_->num_seq_file_read_ = 0; + fs_->num_seq_file_read_ = 0; size_t readahead_size = 128 * 1024; ASSERT_OK(parser.Parse(kOptionsFileName, fs_.get(), false, readahead_size)); - ASSERT_EQ(env_->num_seq_file_read_.load(), + ASSERT_EQ(fs_->num_seq_file_read_.load(), (file_size - 1) / readahead_size + 1); - env_->num_seq_file_read_.store(0); + fs_->num_seq_file_read_.store(0); readahead_size = 1024 * 1024; ASSERT_OK(parser.Parse(kOptionsFileName, fs_.get(), false, readahead_size)); - ASSERT_EQ(env_->num_seq_file_read_.load(), + ASSERT_EQ(fs_->num_seq_file_read_.load(), (file_size - 1) / readahead_size + 1); // Tiny readahead. 8 KB is read each time. - env_->num_seq_file_read_.store(0); + fs_->num_seq_file_read_.store(0); ASSERT_OK( parser.Parse(kOptionsFileName, fs_.get(), false, 1 /* readahead_size */)); - ASSERT_GE(env_->num_seq_file_read_.load(), file_size / (8 * 1024)); - ASSERT_LT(env_->num_seq_file_read_.load(), file_size / (8 * 1024) * 2); + ASSERT_GE(fs_->num_seq_file_read_.load(), file_size / (8 * 1024)); + ASSERT_LT(fs_->num_seq_file_read_.load(), file_size / (8 * 1024) * 2); // Disable readahead means 512KB readahead. - env_->num_seq_file_read_.store(0); + fs_->num_seq_file_read_.store(0); ASSERT_OK( parser.Parse(kOptionsFileName, fs_.get(), false, 0 /* readahead_size */)); - ASSERT_GE(env_->num_seq_file_read_.load(), - (file_size - 1) / (512 * 1024) + 1); + ASSERT_GE(fs_->num_seq_file_read_.load(), (file_size - 1) / (512 * 1024) + 1); } TEST_F(OptionsParserTest, DumpAndParse) { @@ -3083,7 +3079,7 @@ class OptionsSanityCheckTest : public OptionsParserTest { } Status PersistCFOptions(const ColumnFamilyOptions& cf_opts) { - Status s = env_->DeleteFile(kOptionsFileName); + Status s = fs_->DeleteFile(kOptionsFileName, IOOptions(), nullptr); if (!s.ok()) { return s; } diff --git a/table/block_based/data_block_hash_index_test.cc b/table/block_based/data_block_hash_index_test.cc index 7ce296318..0a4276fd3 100644 --- a/table/block_based/data_block_hash_index_test.cc +++ b/table/block_based/data_block_hash_index_test.cc @@ -546,8 +546,10 @@ void TestBoundary(InternalKey& ik1, std::string& v1, InternalKey& ik2, EnvOptions soptions; soptions.use_mmap_reads = ioptions.allow_mmap_reads; + test::StringSink* sink = new test::StringSink(); + std::unique_ptr f(sink); file_writer.reset( - test::GetWritableFileWriter(new test::StringSink(), "" /* don't care */)); + new WritableFileWriter(std::move(f), "" /* don't care */, FileOptions())); std::unique_ptr builder; std::vector> int_tbl_prop_collector_factories; @@ -569,23 +571,20 @@ void TestBoundary(InternalKey& ik1, std::string& v1, InternalKey& ik2, file_writer->Flush(); EXPECT_TRUE(s.ok()) << s.ToString(); - EXPECT_EQ( - test::GetStringSinkFromLegacyWriter(file_writer.get())->contents().size(), - builder->FileSize()); + EXPECT_EQ(sink->contents().size(), builder->FileSize()); // Open the table - file_reader.reset(test::GetRandomAccessFileReader(new test::StringSource( - test::GetStringSinkFromLegacyWriter(file_writer.get())->contents(), - 0 /*uniq_id*/, ioptions.allow_mmap_reads))); + test::StringSource* source = new test::StringSource( + sink->contents(), 0 /*uniq_id*/, ioptions.allow_mmap_reads); + std::unique_ptr file(source); + file_reader.reset(new RandomAccessFileReader(std::move(file), "test")); const bool kSkipFilters = true; const bool kImmortal = true; ASSERT_OK(ioptions.table_factory->NewTableReader( TableReaderOptions(ioptions, moptions.prefix_extractor.get(), soptions, internal_comparator, !kSkipFilters, !kImmortal, level_), - std::move(file_reader), - test::GetStringSinkFromLegacyWriter(file_writer.get())->contents().size(), - &table_reader)); + std::move(file_reader), sink->contents().size(), &table_reader)); // Search using Get() ReadOptions ro; diff --git a/table/table_test.cc b/table/table_test.cc index db765d483..84a942dd4 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -345,8 +345,9 @@ class TableConstructor : public Constructor { const stl_wrappers::KVMap& kv_map) override { Reset(); soptions.use_mmap_reads = ioptions.allow_mmap_reads; - file_writer_.reset(test::GetWritableFileWriter(new test::StringSink(), - "" /* don't care */)); + std::unique_ptr sink(new test::StringSink()); + file_writer_.reset(new WritableFileWriter( + std::move(sink), "" /* don't care */, FileOptions())); std::unique_ptr builder; std::vector> int_tbl_prop_collector_factories; @@ -387,8 +388,10 @@ class TableConstructor : public Constructor { // Open the table uniq_id_ = cur_uniq_id_++; - file_reader_.reset(test::GetRandomAccessFileReader(new test::StringSource( - TEST_GetSink()->contents(), uniq_id_, ioptions.allow_mmap_reads))); + std::unique_ptr source(new test::StringSource( + TEST_GetSink()->contents(), uniq_id_, ioptions.allow_mmap_reads)); + + file_reader_.reset(new RandomAccessFileReader(std::move(source), "test")); const bool kSkipFilters = true; const bool kImmortal = true; return ioptions.table_factory->NewTableReader( @@ -425,8 +428,10 @@ class TableConstructor : public Constructor { virtual Status Reopen(const ImmutableCFOptions& ioptions, const MutableCFOptions& moptions) { - file_reader_.reset(test::GetRandomAccessFileReader(new test::StringSource( - TEST_GetSink()->contents(), uniq_id_, ioptions.allow_mmap_reads))); + std::unique_ptr source(new test::StringSource( + TEST_GetSink()->contents(), uniq_id_, ioptions.allow_mmap_reads)); + + file_reader_.reset(new RandomAccessFileReader(std::move(source), "test")); return ioptions.table_factory->NewTableReader( TableReaderOptions(ioptions, moptions.prefix_extractor.get(), soptions, *last_internal_key_), @@ -445,8 +450,7 @@ class TableConstructor : public Constructor { bool ConvertToInternalKey() { return convert_to_internal_key_; } test::StringSink* TEST_GetSink() { - return ROCKSDB_NAMESPACE::test::GetStringSinkFromLegacyWriter( - file_writer_.get()); + return static_cast(file_writer_->writable_file()); } BlockCacheTracer block_cache_tracer_; @@ -1230,7 +1234,9 @@ class FileChecksumTestHelper { void CreateWriteableFile() { sink_ = new test::StringSink(); - file_writer_.reset(test::GetWritableFileWriter(sink_, "" /* don't care */)); + std::unique_ptr holder(sink_); + file_writer_.reset(new WritableFileWriter( + std::move(holder), "" /* don't care */, FileOptions())); } void SetFileChecksumGenerator(FileChecksumGenerator* checksum_generator) { @@ -1291,10 +1297,11 @@ class FileChecksumTestHelper { assert(file_checksum_generator != nullptr); cur_uniq_id_ = checksum_uniq_id_++; test::StringSink* ss_rw = - ROCKSDB_NAMESPACE::test::GetStringSinkFromLegacyWriter( - file_writer_.get()); - file_reader_.reset(test::GetRandomAccessFileReader( - new test::StringSource(ss_rw->contents()))); + static_cast(file_writer_->writable_file()); + std::unique_ptr source( + new test::StringSource(ss_rw->contents())); + file_reader_.reset(new RandomAccessFileReader(std::move(source), "test")); + std::unique_ptr scratch(new char[2048]); Slice result; uint64_t offset = 0; @@ -3392,9 +3399,9 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) { plain_table_options.hash_table_ratio = 0; PlainTableFactory factory(plain_table_options); - test::StringSink sink; - std::unique_ptr file_writer( - test::GetWritableFileWriter(new test::StringSink(), "" /* don't care */)); + std::unique_ptr sink(new test::StringSink()); + std::unique_ptr file_writer(new WritableFileWriter( + std::move(sink), "" /* don't care */, FileOptions())); Options options; const ImmutableCFOptions ioptions(options); const MutableCFOptions moptions(options); @@ -3421,10 +3428,11 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) { ASSERT_OK(file_writer->Flush()); test::StringSink* ss = - ROCKSDB_NAMESPACE::test::GetStringSinkFromLegacyWriter(file_writer.get()); + static_cast(file_writer->writable_file()); + std::unique_ptr source( + new test::StringSource(ss->contents(), 72242, true)); std::unique_ptr file_reader( - test::GetRandomAccessFileReader( - new test::StringSource(ss->contents(), 72242, true))); + new RandomAccessFileReader(std::move(source), "test")); TableProperties* props = nullptr; auto s = ReadTableProperties(file_reader.get(), ss->contents().size(), @@ -4052,8 +4060,9 @@ TEST_F(PrefixTest, PrefixAndWholeKeyTest) { TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) { BlockBasedTableOptions bbto = GetBlockBasedTableOptions(); test::StringSink* sink = new test::StringSink(); - std::unique_ptr file_writer( - test::GetWritableFileWriter(sink, "" /* don't care */)); + std::unique_ptr holder(sink); + std::unique_ptr file_writer(new WritableFileWriter( + std::move(holder), "" /* don't care */, FileOptions())); Options options; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); const ImmutableCFOptions ioptions(options); @@ -4090,9 +4099,10 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) { // Helper function to get version, global_seqno, global_seqno_offset std::function GetVersionAndGlobalSeqno = [&]() { + std::unique_ptr source( + new test::StringSource(ss_rw.contents(), 73342, true)); std::unique_ptr file_reader( - test::GetRandomAccessFileReader( - new test::StringSource(ss_rw.contents(), 73342, true))); + new RandomAccessFileReader(std::move(source), "")); TableProperties* props = nullptr; ASSERT_OK(ReadTableProperties(file_reader.get(), ss_rw.contents().size(), @@ -4115,16 +4125,18 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) { std::string new_global_seqno; PutFixed64(&new_global_seqno, val); - ASSERT_OK(ss_rw.Write(global_seqno_offset, new_global_seqno)); + ASSERT_OK(ss_rw.Write(global_seqno_offset, new_global_seqno, IOOptions(), + nullptr)); }; // Helper function to get the contents of the table InternalIterator std::unique_ptr table_reader; const ReadOptions read_options; std::function GetTableInternalIter = [&]() { + std::unique_ptr source( + new test::StringSource(ss_rw.contents(), 73342, true)); std::unique_ptr file_reader( - test::GetRandomAccessFileReader( - new test::StringSource(ss_rw.contents(), 73342, true))); + new RandomAccessFileReader(std::move(source), "")); options.table_factory->NewTableReader( TableReaderOptions(ioptions, moptions.prefix_extractor.get(), @@ -4236,8 +4248,9 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) { BlockBasedTableOptions bbto = GetBlockBasedTableOptions(); bbto.block_align = true; test::StringSink* sink = new test::StringSink(); - std::unique_ptr file_writer( - test::GetWritableFileWriter(sink, "" /* don't care */)); + std::unique_ptr holder(sink); + std::unique_ptr file_writer(new WritableFileWriter( + std::move(holder), "" /* don't care */, FileOptions())); Options options; options.compression = kNoCompression; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); @@ -4267,17 +4280,16 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) { ASSERT_OK(builder->Finish()); ASSERT_OK(file_writer->Flush()); - test::RandomRWStringSink ss_rw(sink); + std::unique_ptr source( + new test::StringSource(sink->contents(), 73342, false)); std::unique_ptr file_reader( - test::GetRandomAccessFileReader( - new test::StringSource(ss_rw.contents(), 73342, true))); - + new RandomAccessFileReader(std::move(source), "test")); // Helper function to get version, global_seqno, global_seqno_offset std::function VerifyBlockAlignment = [&]() { TableProperties* props = nullptr; - ASSERT_OK(ReadTableProperties(file_reader.get(), ss_rw.contents().size(), - kBlockBasedTableMagicNumber, ioptions, - &props, true /* compression_type_missing */)); + ASSERT_OK(ReadTableProperties(file_reader.get(), sink->contents().size(), + kBlockBasedTableMagicNumber, ioptions, &props, + true /* compression_type_missing */)); uint64_t data_block_size = props->data_size / props->num_data_blocks; ASSERT_EQ(data_block_size, 4096); @@ -4301,7 +4313,7 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) { TableReaderOptions(ioptions2, moptions2.prefix_extractor.get(), EnvOptions(), GetPlainInternalComparator(options2.comparator)), - std::move(file_reader), ss_rw.contents().size(), &table_reader)); + std::move(file_reader), sink->contents().size(), &table_reader)); ReadOptions read_options; std::unique_ptr db_iter(table_reader->NewIterator( @@ -4328,8 +4340,9 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) { BlockBasedTableOptions bbto = GetBlockBasedTableOptions(); bbto.block_align = true; test::StringSink* sink = new test::StringSink(); - std::unique_ptr file_writer( - test::GetWritableFileWriter(sink, "" /* don't care */)); + std::unique_ptr holder(sink); + std::unique_ptr file_writer(new WritableFileWriter( + std::move(holder), "" /* don't care */, FileOptions())); Options options; options.compression = kNoCompression; @@ -4362,14 +4375,14 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) { ASSERT_OK(builder->Finish()); ASSERT_OK(file_writer->Flush()); - test::RandomRWStringSink ss_rw(sink); + std::unique_ptr source( + new test::StringSource(sink->contents(), 73342, true)); std::unique_ptr file_reader( - test::GetRandomAccessFileReader( - new test::StringSource(ss_rw.contents(), 73342, true))); + new RandomAccessFileReader(std::move(source), "test")); { RandomAccessFileReader* file = file_reader.get(); - uint64_t file_size = ss_rw.contents().size(); + uint64_t file_size = sink->contents().size(); Footer footer; IOOptions opts; @@ -4452,10 +4465,11 @@ TEST_P(BlockBasedTableTest, PropertiesMetaBlockLast) { // get file reader test::StringSink* table_sink = c.TEST_GetSink(); - std::unique_ptr table_reader{ - test::GetRandomAccessFileReader( - new test::StringSource(table_sink->contents(), 0 /* unique_id */, - false /* allow_mmap_reads */))}; + std::unique_ptr source(new test::StringSource( + table_sink->contents(), 0 /* unique_id */, false /* allow_mmap_reads */)); + + std::unique_ptr table_reader( + new RandomAccessFileReader(std::move(source), "test")); size_t table_size = table_sink->contents().size(); // read footer diff --git a/test_util/testutil.cc b/test_util/testutil.cc index f85ee224a..f0df928f1 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -171,25 +171,6 @@ const Comparator* ComparatorWithU64Ts() { return &comp_with_u64_ts; } -WritableFileWriter* GetWritableFileWriter(WritableFile* wf, - const std::string& fname) { - std::unique_ptr file(wf); - return new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)), - fname, EnvOptions()); -} - -RandomAccessFileReader* GetRandomAccessFileReader(RandomAccessFile* raf) { - std::unique_ptr file(raf); - return new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file), - "[test RandomAccessFileReader]"); -} - -SequentialFileReader* GetSequentialFileReader(SequentialFile* se, - const std::string& fname) { - std::unique_ptr file(se); - return new SequentialFileReader(NewLegacySequentialFileWrapper(file), fname); -} - void CorruptKeyType(InternalKey* ikey) { std::string keystr = ikey->Encode().ToString(); keystr[keystr.size() - 8] = kTypeLogData; diff --git a/test_util/testutil.h b/test_util/testutil.h index 9098200d0..00a768d50 100644 --- a/test_util/testutil.h +++ b/test_util/testutil.h @@ -178,23 +178,16 @@ class VectorIterator : public InternalIterator { std::vector values_; size_t current_; }; -extern WritableFileWriter* GetWritableFileWriter(WritableFile* wf, - const std::string& fname); -extern RandomAccessFileReader* GetRandomAccessFileReader(RandomAccessFile* raf); - -extern SequentialFileReader* GetSequentialFileReader(SequentialFile* se, - const std::string& fname); - -class StringSink: public WritableFile { +class StringSink : public FSWritableFile { public: std::string contents_; - explicit StringSink(Slice* reader_contents = nullptr) : - WritableFile(), - contents_(""), - reader_contents_(reader_contents), - last_flush_(0) { + explicit StringSink(Slice* reader_contents = nullptr) + : FSWritableFile(), + contents_(""), + reader_contents_(reader_contents), + last_flush_(0) { if (reader_contents_ != nullptr) { *reader_contents_ = Slice(contents_.data(), 0); } @@ -202,12 +195,15 @@ class StringSink: public WritableFile { const std::string& contents() const { return contents_; } - virtual Status Truncate(uint64_t size) override { + IOStatus Truncate(uint64_t size, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { contents_.resize(static_cast(size)); - return Status::OK(); + return IOStatus::OK(); + } + IOStatus Close(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); } - virtual Status Close() override { return Status::OK(); } - virtual Status Flush() override { + IOStatus Flush(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { if (reader_contents_ != nullptr) { assert(reader_contents_->size() <= last_flush_); size_t offset = last_flush_ - reader_contents_->size(); @@ -217,12 +213,17 @@ class StringSink: public WritableFile { last_flush_ = contents_.size(); } - return Status::OK(); + return IOStatus::OK(); + } + IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); } - virtual Status Sync() override { return Status::OK(); } - virtual Status Append(const Slice& slice) override { + + using FSWritableFile::Append; + IOStatus Append(const Slice& slice, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { contents_.append(slice.data(), slice.size()); - return Status::OK(); + return IOStatus::OK(); } void Drop(size_t bytes) { if (reader_contents_ != nullptr) { @@ -239,36 +240,44 @@ class StringSink: public WritableFile { }; // A wrapper around a StringSink to give it a RandomRWFile interface -class RandomRWStringSink : public RandomRWFile { +class RandomRWStringSink : public FSRandomRWFile { public: explicit RandomRWStringSink(StringSink* ss) : ss_(ss) {} - Status Write(uint64_t offset, const Slice& data) override { + IOStatus Write(uint64_t offset, const Slice& data, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { if (offset + data.size() > ss_->contents_.size()) { ss_->contents_.resize(static_cast(offset) + data.size(), '\0'); } char* pos = const_cast(ss_->contents_.data() + offset); memcpy(pos, data.data(), data.size()); - return Status::OK(); + return IOStatus::OK(); } - Status Read(uint64_t offset, size_t n, Slice* result, - char* /*scratch*/) const override { + IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*opts*/, + Slice* result, char* /*scratch*/, + IODebugContext* /*dbg*/) const override { *result = Slice(nullptr, 0); if (offset < ss_->contents_.size()) { size_t str_res_sz = std::min(static_cast(ss_->contents_.size() - offset), n); *result = Slice(ss_->contents_.data() + offset, str_res_sz); } - return Status::OK(); + return IOStatus::OK(); } - Status Flush() override { return Status::OK(); } + IOStatus Flush(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } - Status Sync() override { return Status::OK(); } + IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } - Status Close() override { return Status::OK(); } + IOStatus Close(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } const std::string& contents() const { return ss_->contents(); } @@ -279,34 +288,42 @@ class RandomRWStringSink : public RandomRWFile { // Like StringSink, this writes into a string. Unlink StringSink, it // has some initial content and overwrites it, just like a recycled // log file. -class OverwritingStringSink : public WritableFile { +class OverwritingStringSink : public FSWritableFile { public: explicit OverwritingStringSink(Slice* reader_contents) - : WritableFile(), + : FSWritableFile(), contents_(""), reader_contents_(reader_contents), last_flush_(0) {} const std::string& contents() const { return contents_; } - virtual Status Truncate(uint64_t size) override { + IOStatus Truncate(uint64_t size, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { contents_.resize(static_cast(size)); - return Status::OK(); + return IOStatus::OK(); + } + IOStatus Close(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); } - virtual Status Close() override { return Status::OK(); } - virtual Status Flush() override { + IOStatus Flush(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { if (last_flush_ < contents_.size()) { assert(reader_contents_->size() >= contents_.size()); memcpy((char*)reader_contents_->data() + last_flush_, contents_.data() + last_flush_, contents_.size() - last_flush_); last_flush_ = contents_.size(); } - return Status::OK(); + return IOStatus::OK(); } - virtual Status Sync() override { return Status::OK(); } - virtual Status Append(const Slice& slice) override { + IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + + using FSWritableFile::Append; + IOStatus Append(const Slice& slice, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { contents_.append(slice.data(), slice.size()); - return Status::OK(); + return IOStatus::OK(); } void Drop(size_t bytes) { contents_.resize(contents_.size() - bytes); @@ -319,7 +336,7 @@ class OverwritingStringSink : public WritableFile { size_t last_flush_; }; -class StringSource: public RandomAccessFile { +class StringSource : public FSRandomAccessFile { public: explicit StringSource(const Slice& contents, uint64_t uniq_id = 0, bool mmap = false) @@ -332,11 +349,23 @@ class StringSource: public RandomAccessFile { uint64_t Size() const { return contents_.size(); } - virtual Status Read(uint64_t offset, size_t n, Slice* result, - char* scratch) const override { + IOStatus Prefetch(uint64_t /*offset*/, size_t /*n*/, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + // If we are using mmap_, it is equivalent to performing a prefetch + if (mmap_) { + return IOStatus::OK(); + } else { + return IOStatus::NotSupported("Prefetch not supported"); + } + } + + IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*opts*/, + Slice* result, char* scratch, + IODebugContext* /*dbg*/) const override { total_reads_++; if (offset > contents_.size()) { - return Status::InvalidArgument("invalid Read offset"); + return IOStatus::InvalidArgument("invalid Read offset"); } if (offset + n > contents_.size()) { n = contents_.size() - static_cast(offset); @@ -347,10 +376,10 @@ class StringSource: public RandomAccessFile { } else { *result = Slice(&contents_[static_cast(offset)], n); } - return Status::OK(); + return IOStatus::OK(); } - virtual size_t GetUniqueId(char* id, size_t max_size) const override { + size_t GetUniqueId(char* id, size_t max_size) const override { if (max_size < 20) { return 0; } @@ -372,13 +401,6 @@ class StringSource: public RandomAccessFile { mutable int total_reads_; }; -inline StringSink* GetStringSinkFromLegacyWriter( - const WritableFileWriter* writer) { - LegacyWritableFileWrapper* file = - static_cast(writer->writable_file()); - return static_cast(file->target()); -} - class NullLogger : public Logger { public: using Logger::Logv; @@ -525,176 +547,220 @@ inline std::string EncodeInt(uint64_t x) { return result; } - class SeqStringSource : public SequentialFile { - public: - SeqStringSource(const std::string& data, std::atomic* read_count) - : data_(data), offset_(0), read_count_(read_count) {} - ~SeqStringSource() override {} - Status Read(size_t n, Slice* result, char* scratch) override { - std::string output; - if (offset_ < data_.size()) { - n = std::min(data_.size() - offset_, n); - memcpy(scratch, data_.data() + offset_, n); - offset_ += n; - *result = Slice(scratch, n); - } else { - return Status::InvalidArgument( - "Attemp to read when it already reached eof."); - } - (*read_count_)++; - return Status::OK(); +class SeqStringSource : public FSSequentialFile { + public: + SeqStringSource(const std::string& data, std::atomic* read_count) + : data_(data), offset_(0), read_count_(read_count) {} + ~SeqStringSource() override {} + IOStatus Read(size_t n, const IOOptions& /*opts*/, Slice* result, + char* scratch, IODebugContext* /*dbg*/) override { + std::string output; + if (offset_ < data_.size()) { + n = std::min(data_.size() - offset_, n); + memcpy(scratch, data_.data() + offset_, n); + offset_ += n; + *result = Slice(scratch, n); + } else { + return IOStatus::InvalidArgument( + "Attempt to read when it already reached eof."); } - Status Skip(uint64_t n) override { - if (offset_ >= data_.size()) { - return Status::InvalidArgument( - "Attemp to read when it already reached eof."); - } - // TODO(yhchiang): Currently doesn't handle the overflow case. - offset_ += static_cast(n); - return Status::OK(); + (*read_count_)++; + return IOStatus::OK(); + } + + IOStatus Skip(uint64_t n) override { + if (offset_ >= data_.size()) { + return IOStatus::InvalidArgument( + "Attempt to read when it already reached eof."); } + // TODO(yhchiang): Currently doesn't handle the overflow case. + offset_ += static_cast(n); + return IOStatus::OK(); + } - private: - std::string data_; - size_t offset_; - std::atomic* read_count_; - }; + private: + std::string data_; + size_t offset_; + std::atomic* read_count_; +}; - class StringEnv : public EnvWrapper { +class StringFS : public FileSystemWrapper { + public: + class StringSink : public FSWritableFile { public: - class StringSink : public WritableFile { - public: - explicit StringSink(std::string* contents) - : WritableFile(), contents_(contents) {} - virtual Status Truncate(uint64_t size) override { - contents_->resize(static_cast(size)); - return Status::OK(); - } - virtual Status Close() override { return Status::OK(); } - virtual Status Flush() override { return Status::OK(); } - virtual Status Sync() override { return Status::OK(); } - virtual Status Append(const Slice& slice) override { - contents_->append(slice.data(), slice.size()); - return Status::OK(); - } + explicit StringSink(std::string* contents) + : FSWritableFile(), contents_(contents) {} + IOStatus Truncate(uint64_t size, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { + contents_->resize(static_cast(size)); + return IOStatus::OK(); + } + IOStatus Close(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Flush(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } - private: - std::string* contents_; - }; + using FSWritableFile::Append; + IOStatus Append(const Slice& slice, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { + contents_->append(slice.data(), slice.size()); + return IOStatus::OK(); + } - explicit StringEnv(Env* t) : EnvWrapper(t) {} - ~StringEnv() override {} + private: + std::string* contents_; + }; - const std::string& GetContent(const std::string& f) { return files_[f]; } + explicit StringFS(const std::shared_ptr& t) + : FileSystemWrapper(t) {} + ~StringFS() override {} - const Status WriteToNewFile(const std::string& file_name, + const std::string& GetContent(const std::string& f) { return files_[f]; } + + const IOStatus WriteToNewFile(const std::string& file_name, const std::string& content) { - std::unique_ptr r; - auto s = NewWritableFile(file_name, &r, EnvOptions()); - if (s.ok()) { - s = r->Append(content); - } - if (s.ok()) { - s = r->Flush(); - } - if (s.ok()) { - s = r->Close(); - } - assert(!s.ok() || files_[file_name] == content); - return s; - } + std::unique_ptr r; + FileOptions file_opts; + IOOptions io_opts; - // The following text is boilerplate that forwards all methods to target() - Status NewSequentialFile(const std::string& f, - std::unique_ptr* r, - const EnvOptions& /*options*/) override { - auto iter = files_.find(f); - if (iter == files_.end()) { - return Status::NotFound("The specified file does not exist", f); - } - r->reset(new SeqStringSource(iter->second, &num_seq_file_read_)); - return Status::OK(); - } - Status NewRandomAccessFile(const std::string& /*f*/, - std::unique_ptr* /*r*/, - const EnvOptions& /*options*/) override { - return Status::NotSupported(); - } - Status NewWritableFile(const std::string& f, - std::unique_ptr* r, - const EnvOptions& /*options*/) override { - auto iter = files_.find(f); - if (iter != files_.end()) { - return Status::IOError("The specified file already exists", f); - } - r->reset(new StringSink(&files_[f])); - return Status::OK(); - } - virtual Status NewDirectory( - const std::string& /*name*/, - std::unique_ptr* /*result*/) override { - return Status::NotSupported(); - } - Status FileExists(const std::string& f) override { - if (files_.find(f) == files_.end()) { - return Status::NotFound(); - } - return Status::OK(); - } - Status GetChildren(const std::string& /*dir*/, - std::vector* /*r*/) override { - return Status::NotSupported(); - } - Status DeleteFile(const std::string& f) override { - files_.erase(f); - return Status::OK(); - } - Status CreateDir(const std::string& /*d*/) override { - return Status::NotSupported(); + auto s = NewWritableFile(file_name, file_opts, &r, nullptr); + if (s.ok()) { + s = r->Append(content, io_opts, nullptr); } - Status CreateDirIfMissing(const std::string& /*d*/) override { - return Status::NotSupported(); + if (s.ok()) { + s = r->Flush(io_opts, nullptr); } - Status DeleteDir(const std::string& /*d*/) override { - return Status::NotSupported(); - } - Status GetFileSize(const std::string& f, uint64_t* s) override { - auto iter = files_.find(f); - if (iter == files_.end()) { - return Status::NotFound("The specified file does not exist:", f); - } - *s = iter->second.size(); - return Status::OK(); + if (s.ok()) { + s = r->Close(io_opts, nullptr); } + assert(!s.ok() || files_[file_name] == content); + return s; + } - Status GetFileModificationTime(const std::string& /*fname*/, - uint64_t* /*file_mtime*/) override { - return Status::NotSupported(); + // The following text is boilerplate that forwards all methods to target() + IOStatus NewSequentialFile(const std::string& f, + const FileOptions& /*options*/, + std::unique_ptr* r, + IODebugContext* /*dbg*/) override { + auto iter = files_.find(f); + if (iter == files_.end()) { + return IOStatus::NotFound("The specified file does not exist", f); } + r->reset(new SeqStringSource(iter->second, &num_seq_file_read_)); + return IOStatus::OK(); + } - Status RenameFile(const std::string& /*s*/, - const std::string& /*t*/) override { - return Status::NotSupported(); - } + IOStatus NewRandomAccessFile(const std::string& /*f*/, + const FileOptions& /*options*/, + std::unique_ptr* /*r*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } - Status LinkFile(const std::string& /*s*/, - const std::string& /*t*/) override { - return Status::NotSupported(); + IOStatus NewWritableFile(const std::string& f, const FileOptions& /*options*/, + std::unique_ptr* r, + IODebugContext* /*dbg*/) override { + auto iter = files_.find(f); + if (iter != files_.end()) { + return IOStatus::IOError("The specified file already exists", f); } + r->reset(new StringSink(&files_[f])); + return IOStatus::OK(); + } + IOStatus NewDirectory(const std::string& /*name*/, + const IOOptions& /*options*/, + std::unique_ptr* /*result*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } - Status LockFile(const std::string& /*f*/, FileLock** /*l*/) override { - return Status::NotSupported(); + IOStatus FileExists(const std::string& f, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + if (files_.find(f) == files_.end()) { + return IOStatus::NotFound(); } + return IOStatus::OK(); + } + + IOStatus GetChildren(const std::string& /*dir*/, const IOOptions& /*options*/, + std::vector* /*r*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } - Status UnlockFile(FileLock* /*l*/) override { - return Status::NotSupported(); + IOStatus DeleteFile(const std::string& f, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + files_.erase(f); + return IOStatus::OK(); + } + + IOStatus CreateDir(const std::string& /*d*/, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus CreateDirIfMissing(const std::string& /*d*/, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus DeleteDir(const std::string& /*d*/, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus GetFileSize(const std::string& f, const IOOptions& /*options*/, + uint64_t* s, IODebugContext* /*dbg*/) override { + auto iter = files_.find(f); + if (iter == files_.end()) { + return IOStatus::NotFound("The specified file does not exist:", f); } + *s = iter->second.size(); + return IOStatus::OK(); + } - std::atomic num_seq_file_read_; + IOStatus GetFileModificationTime(const std::string& /*fname*/, + const IOOptions& /*options*/, + uint64_t* /*file_mtime*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } - protected: - std::unordered_map files_; - }; + IOStatus RenameFile(const std::string& /*s*/, const std::string& /*t*/, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus LinkFile(const std::string& /*s*/, const std::string& /*t*/, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus LockFile(const std::string& /*f*/, const IOOptions& /*options*/, + FileLock** /*l*/, IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus UnlockFile(FileLock* /*l*/, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + std::atomic num_seq_file_read_; + + protected: + std::unordered_map files_; +}; // Randomly initialize the given DBOptions void RandomInitDBOptions(DBOptions* db_opt, Random* rnd); diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index 2abd50b14..c883e9016 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -246,19 +246,21 @@ class ReadaheadRandomAccessFileTest ReadaheadRandomAccessFileTest() : control_contents_() {} std::string Read(uint64_t offset, size_t n) { Slice result; - Status s = test_read_holder_->Read(offset, n, &result, scratch_.get()); + Status s = test_read_holder_->Read(offset, n, IOOptions(), &result, + scratch_.get(), nullptr); EXPECT_TRUE(s.ok() || s.IsInvalidArgument()); return std::string(result.data(), result.size()); } void ResetSourceStr(const std::string& str = "") { - auto write_holder = - std::unique_ptr(test::GetWritableFileWriter( - new test::StringSink(&control_contents_), "" /* don't care */)); + std::unique_ptr sink( + new test::StringSink(&control_contents_)); + std::unique_ptr write_holder(new WritableFileWriter( + std::move(sink), "" /* don't care */, FileOptions())); Status s = write_holder->Append(Slice(str)); EXPECT_OK(s); s = write_holder->Flush(); EXPECT_OK(s); - auto read_holder = std::unique_ptr( + std::unique_ptr read_holder( new test::StringSource(control_contents_)); test_read_holder_ = NewReadaheadRandomAccessFile(std::move(read_holder), readahead_size_); @@ -268,7 +270,7 @@ class ReadaheadRandomAccessFileTest private: size_t readahead_size_; Slice control_contents_; - std::unique_ptr test_read_holder_; + std::unique_ptr test_read_holder_; std::unique_ptr scratch_; }; @@ -353,10 +355,10 @@ class ReadaheadSequentialFileTest : public testing::Test, } void Skip(size_t n) { test_read_holder_->Skip(n); } void ResetSourceStr(const std::string& str = "") { - auto read_holder = std::unique_ptr( + auto read_holder = std::unique_ptr( new test::SeqStringSource(str, &seq_read_count_)); - test_read_holder_.reset(new SequentialFileReader( - NewLegacySequentialFileWrapper(read_holder), "test", readahead_size_)); + test_read_holder_.reset(new SequentialFileReader(std::move(read_holder), + "test", readahead_size_)); } size_t GetReadaheadSize() const { return readahead_size_; } diff --git a/utilities/blob_db/blob_dump_tool.cc b/utilities/blob_db/blob_dump_tool.cc index 8f425f730..7fb3d6b3b 100644 --- a/utilities/blob_db/blob_dump_tool.cc +++ b/utilities/blob_db/blob_dump_tool.cc @@ -5,17 +5,19 @@ #ifndef ROCKSDB_LITE #include "utilities/blob_db/blob_dump_tool.h" + #include + #include #include #include #include -#include "env/composite_env_wrapper.h" + #include "file/random_access_file_reader.h" #include "file/readahead_raf.h" #include "port/port.h" #include "rocksdb/convenience.h" -#include "rocksdb/env.h" +#include "rocksdb/file_system.h" #include "table/format.h" #include "util/coding.h" #include "util/string_util.h" @@ -32,18 +34,19 @@ Status BlobDumpTool::Run(const std::string& filename, DisplayType show_key, bool show_summary) { constexpr size_t kReadaheadSize = 2 * 1024 * 1024; Status s; - Env* env = Env::Default(); - s = env->FileExists(filename); + const auto fs = FileSystem::Default(); + IOOptions io_opts; + s = fs->FileExists(filename, io_opts, nullptr); if (!s.ok()) { return s; } uint64_t file_size = 0; - s = env->GetFileSize(filename, &file_size); + s = fs->GetFileSize(filename, io_opts, &file_size, nullptr); if (!s.ok()) { return s; } - std::unique_ptr file; - s = env->NewRandomAccessFile(filename, &file, EnvOptions()); + std::unique_ptr file; + s = fs->NewRandomAccessFile(filename, FileOptions(), &file, nullptr); if (!s.ok()) { return s; } @@ -51,8 +54,7 @@ Status BlobDumpTool::Run(const std::string& filename, DisplayType show_key, if (file_size == 0) { return Status::Corruption("File is empty."); } - reader_.reset(new RandomAccessFileReader( - NewLegacyRandomAccessFileWrapper(file), filename)); + reader_.reset(new RandomAccessFileReader(std::move(file), filename)); uint64_t offset = 0; uint64_t footer_offset = 0; CompressionType compression = kNoCompression;