diff --git a/CMakeLists.txt b/CMakeLists.txt index dd8adb9d3..79c7fc932 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -578,6 +578,7 @@ set(SOURCES table/plain_table_index.cc table/plain_table_key_coding.cc table/plain_table_reader.cc + table/sst_file_reader.cc table/sst_file_writer.cc table/table_properties.cc table/two_level_iterator.cc @@ -935,6 +936,7 @@ if(WITH_TESTS) table/data_block_hash_index_test.cc table/full_filter_block_test.cc table/merger_test.cc + table/sst_file_reader_test.cc table/table_test.cc tools/ldb_cmd_test.cc tools/reduce_levels_test.cc diff --git a/Makefile b/Makefile index 5d6877282..d4d563f4d 100644 --- a/Makefile +++ b/Makefile @@ -555,6 +555,7 @@ TESTS = \ repeatable_thread_test \ range_tombstone_fragmenter_test \ range_del_aggregator_v2_test \ + sst_file_reader_test \ PARALLEL_TEST = \ backupable_db_test \ @@ -1590,6 +1591,9 @@ range_tombstone_fragmenter_test: db/range_tombstone_fragmenter_test.o db/db_test range_del_aggregator_v2_test: db/range_del_aggregator_v2_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +sst_file_reader_test: table/sst_file_reader_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + #------------------------------------------------- # make install related stuff INSTALL_PATH ?= /usr/local diff --git a/TARGETS b/TARGETS index 5ee4fe2f8..2088e27e4 100644 --- a/TARGETS +++ b/TARGETS @@ -202,6 +202,7 @@ cpp_library( "table/plain_table_index.cc", "table/plain_table_key_coding.cc", "table/plain_table_reader.cc", + "table/sst_file_reader.cc", "table/sst_file_writer.cc", "table/table_properties.cc", "table/two_level_iterator.cc", @@ -1098,6 +1099,11 @@ ROCKS_TESTS = [ "utilities/transactions/write_unprepared_transaction_test.cc", "parallel", ], + [ + "sst_file_reader_test", + "table/sst_file_reader_test.cc", + "serial", + ], ] # Generate a test rule for each entry in ROCKS_TESTS diff --git a/include/rocksdb/sst_file_reader.h b/include/rocksdb/sst_file_reader.h new file mode 100644 index 000000000..e58c84792 --- /dev/null +++ b/include/rocksdb/sst_file_reader.h @@ -0,0 +1,45 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include "rocksdb/slice.h" +#include "rocksdb/options.h" +#include "rocksdb/iterator.h" +#include "rocksdb/table_properties.h" + +namespace rocksdb { + +// SstFileReader is used to read sst files that are generated by DB or +// SstFileWriter. +class SstFileReader { + public: + SstFileReader(const Options& options); + + ~SstFileReader(); + + // Prepares to read from the file located at "file_path". + Status Open(const std::string& file_path); + + // Returns a new iterator over the table contents. + // Most read options provide the same control as we read from DB. + // If "snapshot" is nullptr, the iterator returns only the latest keys. + Iterator* NewIterator(const ReadOptions& options); + + std::shared_ptr GetTableProperties() const; + + // Verifies whether there is corruption in this table. + Status VerifyChecksum(); + + private: + struct Rep; + std::unique_ptr rep_; +}; + +} // namespace rocksdb + +#endif // !ROCKSDB_LITE diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index 13e008260..a99c8bf6e 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -452,7 +452,7 @@ class TableFactory { // NewTableReader() is called in three places: // (1) TableCache::FindTable() calls the function when table cache miss // and cache the table object returned. - // (2) SstFileReader (for SST Dump) opens the table and dump the table + // (2) SstFileDumper (for SST Dump) opens the table and dump the table // contents using the iterator of the table. // (3) DBImpl::IngestExternalFile() calls this function to read the contents of // the sst file it's attempting to add diff --git a/src.mk b/src.mk index 4f895b981..e0479373c 100644 --- a/src.mk +++ b/src.mk @@ -122,6 +122,7 @@ LIB_SOURCES = \ table/plain_table_index.cc \ table/plain_table_key_coding.cc \ table/plain_table_reader.cc \ + table/sst_file_reader.cc \ table/sst_file_writer.cc \ table/table_properties.cc \ table/two_level_iterator.cc \ @@ -362,6 +363,7 @@ MAIN_SOURCES = \ table/data_block_hash_index_test.cc \ table/full_filter_block_test.cc \ table/merger_test.cc \ + table/sst_file_reader_test.cc \ table/table_reader_bench.cc \ table/table_test.cc \ third-party/gtest-1.7.0/fused-src/gtest/gtest-all.cc \ diff --git a/table/sst_file_reader.cc b/table/sst_file_reader.cc new file mode 100644 index 000000000..a915449be --- /dev/null +++ b/table/sst_file_reader.cc @@ -0,0 +1,84 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "rocksdb/sst_file_reader.h" + +#include "db/db_iter.h" +#include "options/cf_options.h" +#include "table/get_context.h" +#include "table/table_reader.h" +#include "table/table_builder.h" +#include "util/file_reader_writer.h" + +namespace rocksdb { + +struct SstFileReader::Rep { + Options options; + EnvOptions soptions; + ImmutableCFOptions ioptions; + MutableCFOptions moptions; + + std::unique_ptr table_reader; + + Rep(const Options& opts) + : options(opts), + soptions(options), + ioptions(options), + moptions(ColumnFamilyOptions(options)) {} +}; + +SstFileReader::SstFileReader(const Options& options) + : rep_(new Rep(options)) {} + +SstFileReader::~SstFileReader() {} + +Status SstFileReader::Open(const std::string& file_path) { + auto r = rep_.get(); + Status s; + uint64_t file_size = 0; + std::unique_ptr file; + std::unique_ptr file_reader; + s = r->options.env->GetFileSize(file_path, &file_size); + if (s.ok()) { + s = r->options.env->NewRandomAccessFile(file_path, &file, r->soptions); + } + if (s.ok()) { + file_reader.reset(new RandomAccessFileReader(std::move(file), file_path)); + } + if (s.ok()) { + s = r->options.table_factory->NewTableReader( + TableReaderOptions(r->ioptions, r->moptions.prefix_extractor.get(), + r->soptions, r->ioptions.internal_comparator), + std::move(file_reader), file_size, &r->table_reader); + } + return s; +} + +Iterator* SstFileReader::NewIterator(const ReadOptions& options) { + auto r = rep_.get(); + auto sequence = options.snapshot != nullptr ? + options.snapshot->GetSequenceNumber() : + kMaxSequenceNumber; + auto internal_iter = r->table_reader->NewIterator( + options, r->moptions.prefix_extractor.get()); + return NewDBIterator(r->options.env, options, r->ioptions, r->moptions, + r->ioptions.user_comparator, internal_iter, sequence, + r->moptions.max_sequential_skip_in_iterations, + nullptr /* read_callback */); +} + +std::shared_ptr SstFileReader::GetTableProperties() const { + return rep_->table_reader->GetTableProperties(); +} + +Status SstFileReader::VerifyChecksum() { + return rep_->table_reader->VerifyChecksum(); +} + +} // namespace rocksdb + +#endif // !ROCKSDB_LITE diff --git a/table/sst_file_reader_test.cc b/table/sst_file_reader_test.cc new file mode 100644 index 000000000..e78a8bd4c --- /dev/null +++ b/table/sst_file_reader_test.cc @@ -0,0 +1,98 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include + +#include "rocksdb/sst_file_reader.h" +#include "rocksdb/sst_file_writer.h" +#include "util/testharness.h" +#include "util/testutil.h" +#include "utilities/merge_operators.h" + +namespace rocksdb { + +std::string EncodeAsString(uint64_t v) { + char buf[16]; + snprintf(buf, sizeof(buf), "%08" PRIu64, v); + return std::string(buf); +} + +std::string EncodeAsUint64(uint64_t v) { + std::string dst; + PutFixed64(&dst, v); + return dst; +} + +class SstFileReaderTest : public testing::Test { + public: + SstFileReaderTest() { + options_.merge_operator = MergeOperators::CreateUInt64AddOperator(); + sst_name_ = test::PerThreadDBPath("sst_file"); + } + + void CreateFileAndCheck(const std::vector& keys) { + SstFileWriter writer(soptions_, options_); + ASSERT_OK(writer.Open(sst_name_)); + for (size_t i = 0; i + 2 < keys.size(); i += 3) { + ASSERT_OK(writer.Put(keys[i], keys[i])); + ASSERT_OK(writer.Merge(keys[i+1], EncodeAsUint64(i+1))); + ASSERT_OK(writer.Delete(keys[i+2])); + } + ASSERT_OK(writer.Finish()); + + ReadOptions ropts; + SstFileReader reader(options_); + ASSERT_OK(reader.Open(sst_name_)); + ASSERT_OK(reader.VerifyChecksum()); + std::unique_ptr iter(reader.NewIterator(ropts)); + iter->SeekToFirst(); + for (size_t i = 0; i + 2 < keys.size(); i += 3) { + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(keys[i]), 0); + ASSERT_EQ(iter->value().compare(keys[i]), 0); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(keys[i+1]), 0); + ASSERT_EQ(iter->value().compare(EncodeAsUint64(i+1)), 0); + iter->Next(); + } + ASSERT_FALSE(iter->Valid()); + } + + protected: + Options options_; + EnvOptions soptions_; + std::string sst_name_; +}; + +const uint64_t kNumKeys = 100; + +TEST_F(SstFileReaderTest, Basic) { + std::vector keys; + for (uint64_t i = 0; i < kNumKeys; i++) { + keys.emplace_back(EncodeAsString(i)); + } + CreateFileAndCheck(keys); +} + +TEST_F(SstFileReaderTest, Uint64Comparator) { + options_.comparator = test::Uint64Comparator(); + std::vector keys; + for (uint64_t i = 0; i < kNumKeys; i++) { + keys.emplace_back(EncodeAsUint64(i)); + } + CreateFileAndCheck(keys); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#endif // ROCKSDB_LITE diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 8ca385447..997718ef2 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -2845,8 +2845,8 @@ void DumpSstFile(std::string filename, bool output_hex, bool show_properties) { return; } // no verification - rocksdb::SstFileReader reader(filename, false, output_hex); - Status st = reader.ReadSequential(true, std::numeric_limits::max(), false, // has_from + rocksdb::SstFileDumper dumper(filename, false, output_hex); + Status st = dumper.ReadSequential(true, std::numeric_limits::max(), false, // has_from from_key, false, // has_to to_key); if (!st.ok()) { @@ -2860,11 +2860,11 @@ void DumpSstFile(std::string filename, bool output_hex, bool show_properties) { std::shared_ptr table_properties_from_reader; - st = reader.ReadTableProperties(&table_properties_from_reader); + st = dumper.ReadTableProperties(&table_properties_from_reader); if (!st.ok()) { std::cerr << filename << ": " << st.ToString() << ". Try to use initial table properties" << std::endl; - table_properties = reader.GetInitTableProperties(); + table_properties = dumper.GetInitTableProperties(); } else { table_properties = table_properties_from_reader.get(); } diff --git a/tools/sst_dump_tool.cc b/tools/sst_dump_tool.cc index d2e823a98..25699777e 100644 --- a/tools/sst_dump_tool.cc +++ b/tools/sst_dump_tool.cc @@ -43,7 +43,7 @@ namespace rocksdb { -SstFileReader::SstFileReader(const std::string& file_path, bool verify_checksum, +SstFileDumper::SstFileDumper(const std::string& file_path, bool verify_checksum, bool output_hex) : file_name_(file_path), read_num_(0), @@ -74,7 +74,7 @@ static const std::vector> {CompressionType::kXpressCompression, "kXpressCompression"}, {CompressionType::kZSTD, "kZSTD"}}; -Status SstFileReader::GetTableReader(const std::string& file_path) { +Status SstFileDumper::GetTableReader(const std::string& file_path) { // Warning about 'magic_number' being uninitialized shows up only in UBsan // builds. Though access is guarded by 's.ok()' checks, fix the issue to // avoid any warnings. @@ -123,7 +123,7 @@ Status SstFileReader::GetTableReader(const std::string& file_path) { return s; } -Status SstFileReader::NewTableReader( +Status SstFileDumper::NewTableReader( const ImmutableCFOptions& /*ioptions*/, const EnvOptions& /*soptions*/, const InternalKeyComparator& /*internal_comparator*/, uint64_t file_size, std::unique_ptr* /*table_reader*/) { @@ -143,11 +143,11 @@ Status SstFileReader::NewTableReader( std::move(file_), file_size, &table_reader_); } -Status SstFileReader::VerifyChecksum() { +Status SstFileDumper::VerifyChecksum() { return table_reader_->VerifyChecksum(); } -Status SstFileReader::DumpTable(const std::string& out_filename) { +Status SstFileDumper::DumpTable(const std::string& out_filename) { std::unique_ptr out_file; Env* env = Env::Default(); env->NewWritableFile(out_filename, &out_file, soptions_); @@ -157,7 +157,7 @@ Status SstFileReader::DumpTable(const std::string& out_filename) { return s; } -uint64_t SstFileReader::CalculateCompressedTableSize( +uint64_t SstFileDumper::CalculateCompressedTableSize( const TableBuilderOptions& tb_options, size_t block_size) { std::unique_ptr out_file; std::unique_ptr env(NewMemEnv(Env::Default())); @@ -192,7 +192,7 @@ uint64_t SstFileReader::CalculateCompressedTableSize( return size; } -int SstFileReader::ShowAllCompressionSizes( +int SstFileDumper::ShowAllCompressionSizes( size_t block_size, const std::vector>& compression_types) { @@ -226,7 +226,7 @@ int SstFileReader::ShowAllCompressionSizes( return 0; } -Status SstFileReader::ReadTableProperties(uint64_t table_magic_number, +Status SstFileDumper::ReadTableProperties(uint64_t table_magic_number, RandomAccessFileReader* file, uint64_t file_size) { TableProperties* table_properties = nullptr; @@ -240,7 +240,7 @@ Status SstFileReader::ReadTableProperties(uint64_t table_magic_number, return s; } -Status SstFileReader::SetTableOptionsByMagicNumber( +Status SstFileDumper::SetTableOptionsByMagicNumber( uint64_t table_magic_number) { assert(table_properties_); if (table_magic_number == kBlockBasedTableMagicNumber || @@ -283,7 +283,7 @@ Status SstFileReader::SetTableOptionsByMagicNumber( return Status::OK(); } -Status SstFileReader::SetOldTableOptions() { +Status SstFileDumper::SetOldTableOptions() { assert(table_properties_ == nullptr); options_.table_factory = std::make_shared(); fprintf(stdout, "Sst file format: block-based(old version)\n"); @@ -291,7 +291,7 @@ Status SstFileReader::SetOldTableOptions() { return Status::OK(); } -Status SstFileReader::ReadSequential(bool print_kv, uint64_t read_num, +Status SstFileDumper::ReadSequential(bool print_kv, uint64_t read_num, bool has_from, const std::string& from_key, bool has_to, const std::string& to_key, bool use_from_as_prefix) { @@ -348,7 +348,7 @@ Status SstFileReader::ReadSequential(bool print_kv, uint64_t read_num, return ret; } -Status SstFileReader::ReadTableProperties( +Status SstFileDumper::ReadTableProperties( std::shared_ptr* table_properties) { if (!table_reader_) { return init_result_; @@ -570,16 +570,16 @@ int SSTDumpTool::Run(int argc, char** argv) { filename = std::string(dir_or_file) + "/" + filename; } - rocksdb::SstFileReader reader(filename, verify_checksum, + rocksdb::SstFileDumper dumper(filename, verify_checksum, output_hex); - if (!reader.getStatus().ok()) { + if (!dumper.getStatus().ok()) { fprintf(stderr, "%s: %s\n", filename.c_str(), - reader.getStatus().ToString().c_str()); + dumper.getStatus().ToString().c_str()); continue; } if (command == "recompress") { - reader.ShowAllCompressionSizes( + dumper.ShowAllCompressionSizes( set_block_size ? block_size : 16384, compression_types.empty() ? kCompressions : compression_types); return 0; @@ -589,7 +589,7 @@ int SSTDumpTool::Run(int argc, char** argv) { std::string out_filename = filename.substr(0, filename.length() - 4); out_filename.append("_dump.txt"); - st = reader.DumpTable(out_filename); + st = dumper.DumpTable(out_filename); if (!st.ok()) { fprintf(stderr, "%s: %s\n", filename.c_str(), st.ToString().c_str()); exit(1); @@ -601,7 +601,7 @@ int SSTDumpTool::Run(int argc, char** argv) { // scan all files in give file path. if (command == "" || command == "scan" || command == "check") { - st = reader.ReadSequential( + st = dumper.ReadSequential( command == "scan", read_num > 0 ? (read_num - total_read) : read_num, has_from || use_from_as_prefix, from_key, has_to, to_key, use_from_as_prefix); @@ -609,14 +609,14 @@ int SSTDumpTool::Run(int argc, char** argv) { fprintf(stderr, "%s: %s\n", filename.c_str(), st.ToString().c_str()); } - total_read += reader.GetReadNumber(); + total_read += dumper.GetReadNumber(); if (read_num > 0 && total_read > read_num) { break; } } if (command == "verify") { - st = reader.VerifyChecksum(); + st = dumper.VerifyChecksum(); if (!st.ok()) { fprintf(stderr, "%s is corrupted: %s\n", filename.c_str(), st.ToString().c_str()); @@ -631,11 +631,11 @@ int SSTDumpTool::Run(int argc, char** argv) { std::shared_ptr table_properties_from_reader; - st = reader.ReadTableProperties(&table_properties_from_reader); + st = dumper.ReadTableProperties(&table_properties_from_reader); if (!st.ok()) { fprintf(stderr, "%s: %s\n", filename.c_str(), st.ToString().c_str()); fprintf(stderr, "Try to use initial table properties\n"); - table_properties = reader.GetInitTableProperties(); + table_properties = dumper.GetInitTableProperties(); } else { table_properties = table_properties_from_reader.get(); } diff --git a/tools/sst_dump_tool_imp.h b/tools/sst_dump_tool_imp.h index a0bad085d..9e83d8d04 100644 --- a/tools/sst_dump_tool_imp.h +++ b/tools/sst_dump_tool_imp.h @@ -15,9 +15,9 @@ namespace rocksdb { -class SstFileReader { +class SstFileDumper { public: - explicit SstFileReader(const std::string& file_name, bool verify_checksum, + explicit SstFileDumper(const std::string& file_name, bool verify_checksum, bool output_hex); Status ReadSequential(bool print_kv, uint64_t read_num, bool has_from,