sst_dump to reduce number of file reads (#6836)

Summary:
sst_dump can issue many file reads from the file system. This doesn't work well with file systems without a OS cache, especially remote file systems. In order to mitigate this problem, several improvements are done:
1. --readahead_size is added, so that users can specify readahead size when scanning the data.
2. Force a 512KB tail readahead, which prevents three I/Os for footer, meta index and property blocks and hopefully index and filter blocks too.
3. Consoldiate SSTDump's I/Os before opening the file for read. Use the same file prefetch buffer.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6836

Test Plan: Add a test that covers this new feature.

Reviewed By: pdillinger

Differential Revision: D21516607

fbshipit-source-id: 3ae43526286f67b2f4a5bdedfbc92719d579b87e
main
sdong 6 years ago committed by Facebook GitHub Bot
parent 70aaa9ceeb
commit 4a4b8a1344
  1. 3
      HISTORY.md
  2. 3
      db/convenience.cc
  3. 3
      db/table_cache.cc
  4. 2
      file/random_access_file_reader.cc
  5. 3
      table/block_based/block_based_table_factory.cc
  6. 2
      table/block_based/block_based_table_factory.h
  7. 15
      table/block_based/block_based_table_reader.cc
  8. 9
      table/block_based/block_based_table_reader.h
  9. 17
      table/meta_blocks.cc
  10. 3
      table/meta_blocks.h
  11. 14
      table/table_builder.h
  12. 2
      table/table_test.cc
  13. 3
      tools/ldb_cmd.cc
  14. 42
      tools/sst_dump_test.cc
  15. 121
      tools/sst_dump_tool.cc
  16. 9
      tools/sst_dump_tool_imp.h

@ -16,6 +16,9 @@
* DeleteRange now returns `Status::InvalidArgument` if the range's end key comes before its start key according to the user comparator. Previously the behavior was undefined. * DeleteRange now returns `Status::InvalidArgument` if the range's end key comes before its start key according to the user comparator. Previously the behavior was undefined.
* ldb now uses options.force_consistency_checks = true by default and "--disable_consistency_checks" is added to disable it. * ldb now uses options.force_consistency_checks = true by default and "--disable_consistency_checks" is added to disable it.
### New Feature
* sst_dump to add a new --readahead_size argument. Users can specify read size when scanning the data. Sst_dump also tries to prefetch tail part of the SST files so usually some number of I/Os are saved there too.
## 6.10 (5/2/2020) ## 6.10 (5/2/2020)
### Bug Fixes ### Bug Fixes
* Fix wrong result being read from ingested file. May happen when a key in the file happen to be prefix of another key also in the file. The issue can further cause more data corruption. The issue exists with rocksdb >= 5.0.0 since DB::IngestExternalFile() was introduced. * Fix wrong result being read from ingested file. May happen when a key in the file happen to be prefix of another key also in the file. The issue can further cause more data corruption. The issue exists with rocksdb >= 5.0.0 since DB::IngestExternalFile() was introduced.

@ -61,7 +61,8 @@ Status VerifySstFileChecksum(const Options& options,
s = ioptions.table_factory->NewTableReader( s = ioptions.table_factory->NewTableReader(
TableReaderOptions(ioptions, options.prefix_extractor.get(), env_options, TableReaderOptions(ioptions, options.prefix_extractor.get(), env_options,
internal_comparator, false /* skip_filters */, internal_comparator, false /* skip_filters */,
!kImmortal, -1 /* level */), !kImmortal, false /* force_direct_prefetch */,
-1 /* level */),
std::move(file_reader), file_size, &table_reader, std::move(file_reader), file_size, &table_reader,
false /* prefetch_index_and_filter_in_cache */); false /* prefetch_index_and_filter_in_cache */);
if (!s.ok()) { if (!s.ok()) {

@ -123,7 +123,8 @@ Status TableCache::GetTableReader(
s = ioptions_.table_factory->NewTableReader( s = ioptions_.table_factory->NewTableReader(
TableReaderOptions(ioptions_, prefix_extractor, file_options, TableReaderOptions(ioptions_, prefix_extractor, file_options,
internal_comparator, skip_filters, immortal_tables_, internal_comparator, skip_filters, immortal_tables_,
level, fd.largest_seqno, block_cache_tracer_), false /* force_direct_prefetch */, level,
fd.largest_seqno, block_cache_tracer_),
std::move(file_reader), fd.GetFileSize(), table_reader, std::move(file_reader), fd.GetFileSize(), table_reader,
prefetch_index_and_filter_in_cache); prefetch_index_and_filter_in_cache);
TEST_SYNC_POINT("TableCache::GetTableReader:0"); TEST_SYNC_POINT("TableCache::GetTableReader:0");

@ -27,6 +27,8 @@ Status RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
AlignedBuf* aligned_buf, AlignedBuf* aligned_buf,
bool for_compaction) const { bool for_compaction) const {
(void)aligned_buf; (void)aligned_buf;
TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr);
Status s; Status s;
uint64_t elapsed = 0; uint64_t elapsed = 0;
{ {

@ -418,7 +418,8 @@ Status BlockBasedTableFactory::NewTableReader(
file_size, table_reader, table_reader_options.prefix_extractor, file_size, table_reader, table_reader_options.prefix_extractor,
prefetch_index_and_filter_in_cache, table_reader_options.skip_filters, prefetch_index_and_filter_in_cache, table_reader_options.skip_filters,
table_reader_options.level, table_reader_options.immortal, table_reader_options.level, table_reader_options.immortal,
table_reader_options.largest_seqno, &tail_prefetch_stats_, table_reader_options.largest_seqno,
table_reader_options.force_direct_prefetch, &tail_prefetch_stats_,
table_reader_options.block_cache_tracer); table_reader_options.block_cache_tracer);
} }

@ -73,6 +73,8 @@ class BlockBasedTableFactory : public TableFactory {
bool IsDeleteRangeSupported() const override { return true; } bool IsDeleteRangeSupported() const override { return true; }
TailPrefetchStats* tail_prefetch_stats() { return &tail_prefetch_stats_; }
static const std::string kName; static const std::string kName;
private: private:

@ -609,7 +609,8 @@ Status BlockBasedTable::Open(
const SliceTransform* prefix_extractor, const SliceTransform* prefix_extractor,
const bool prefetch_index_and_filter_in_cache, const bool skip_filters, const bool prefetch_index_and_filter_in_cache, const bool skip_filters,
const int level, const bool immortal_table, const int level, const bool immortal_table,
const SequenceNumber largest_seqno, TailPrefetchStats* tail_prefetch_stats, const SequenceNumber largest_seqno, const bool force_direct_prefetch,
TailPrefetchStats* tail_prefetch_stats,
BlockCacheTracer* const block_cache_tracer) { BlockCacheTracer* const block_cache_tracer) {
table_reader->reset(); table_reader->reset();
@ -622,8 +623,9 @@ Status BlockBasedTable::Open(
const bool preload_all = !table_options.cache_index_and_filter_blocks; const bool preload_all = !table_options.cache_index_and_filter_blocks;
if (!ioptions.allow_mmap_reads) { if (!ioptions.allow_mmap_reads) {
s = PrefetchTail(file.get(), file_size, tail_prefetch_stats, prefetch_all, s = PrefetchTail(file.get(), file_size, force_direct_prefetch,
preload_all, &prefetch_buffer); tail_prefetch_stats, prefetch_all, preload_all,
&prefetch_buffer);
} else { } else {
// Should not prefetch for mmap mode. // Should not prefetch for mmap mode.
prefetch_buffer.reset(new FilePrefetchBuffer( prefetch_buffer.reset(new FilePrefetchBuffer(
@ -724,8 +726,8 @@ Status BlockBasedTable::Open(
Status BlockBasedTable::PrefetchTail( Status BlockBasedTable::PrefetchTail(
RandomAccessFileReader* file, uint64_t file_size, RandomAccessFileReader* file, uint64_t file_size,
TailPrefetchStats* tail_prefetch_stats, const bool prefetch_all, bool force_direct_prefetch, TailPrefetchStats* tail_prefetch_stats,
const bool preload_all, const bool prefetch_all, const bool preload_all,
std::unique_ptr<FilePrefetchBuffer>* prefetch_buffer) { std::unique_ptr<FilePrefetchBuffer>* prefetch_buffer) {
size_t tail_prefetch_size = 0; size_t tail_prefetch_size = 0;
if (tail_prefetch_stats != nullptr) { if (tail_prefetch_stats != nullptr) {
@ -755,7 +757,7 @@ Status BlockBasedTable::PrefetchTail(
&tail_prefetch_size); &tail_prefetch_size);
Status s; Status s;
// TODO should not have this special logic in the future. // TODO should not have this special logic in the future.
if (!file->use_direct_io()) { if (!file->use_direct_io() && !force_direct_prefetch) {
prefetch_buffer->reset(new FilePrefetchBuffer( prefetch_buffer->reset(new FilePrefetchBuffer(
nullptr, 0, 0, false /* enable */, true /* track_min_offset */)); nullptr, 0, 0, false /* enable */, true /* track_min_offset */));
s = file->Prefetch(prefetch_off, prefetch_len); s = file->Prefetch(prefetch_off, prefetch_len);
@ -768,7 +770,6 @@ Status BlockBasedTable::PrefetchTail(
return s; return s;
} }
Status BlockBasedTable::TryReadPropertiesWithGlobalSeqno( Status BlockBasedTable::TryReadPropertiesWithGlobalSeqno(
FilePrefetchBuffer* prefetch_buffer, const Slice& handle_value, FilePrefetchBuffer* prefetch_buffer, const Slice& handle_value,
TableProperties** table_properties) { TableProperties** table_properties) {

@ -85,6 +85,8 @@ class BlockBasedTable : public TableReader {
// @param skip_filters Disables loading/accessing the filter block. Overrides // @param skip_filters Disables loading/accessing the filter block. Overrides
// prefetch_index_and_filter_in_cache, so filter will be skipped if both // prefetch_index_and_filter_in_cache, so filter will be skipped if both
// are set. // are set.
// @param force_direct_prefetch if true, always prefetching to RocksDB
// buffer, rather than calling RandomAccessFile::Prefetch().
static Status Open(const ImmutableCFOptions& ioptions, static Status Open(const ImmutableCFOptions& ioptions,
const EnvOptions& env_options, const EnvOptions& env_options,
const BlockBasedTableOptions& table_options, const BlockBasedTableOptions& table_options,
@ -97,6 +99,7 @@ class BlockBasedTable : public TableReader {
bool skip_filters = false, int level = -1, bool skip_filters = false, int level = -1,
const bool immortal_table = false, const bool immortal_table = false,
const SequenceNumber largest_seqno = 0, const SequenceNumber largest_seqno = 0,
bool force_direct_prefetch = false,
TailPrefetchStats* tail_prefetch_stats = nullptr, TailPrefetchStats* tail_prefetch_stats = nullptr,
BlockCacheTracer* const block_cache_tracer = nullptr); BlockCacheTracer* const block_cache_tracer = nullptr);
@ -393,10 +396,12 @@ class BlockBasedTable : public TableReader {
const SliceTransform* prefix_extractor, const SliceTransform* prefix_extractor,
BlockCacheLookupContext* lookup_context) const; BlockCacheLookupContext* lookup_context) const;
// If force_direct_prefetch is true, always prefetching to RocksDB
// buffer, rather than calling RandomAccessFile::Prefetch().
static Status PrefetchTail( static Status PrefetchTail(
RandomAccessFileReader* file, uint64_t file_size, RandomAccessFileReader* file, uint64_t file_size,
TailPrefetchStats* tail_prefetch_stats, const bool prefetch_all, bool force_direct_prefetch, TailPrefetchStats* tail_prefetch_stats,
const bool preload_all, const bool prefetch_all, const bool preload_all,
std::unique_ptr<FilePrefetchBuffer>* prefetch_buffer); std::unique_ptr<FilePrefetchBuffer>* prefetch_buffer);
Status ReadMetaIndexBlock(FilePrefetchBuffer* prefetch_buffer, Status ReadMetaIndexBlock(FilePrefetchBuffer* prefetch_buffer,
std::unique_ptr<Block>* metaindex_block, std::unique_ptr<Block>* metaindex_block,

@ -358,11 +358,12 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
const ImmutableCFOptions& ioptions, const ImmutableCFOptions& ioptions,
TableProperties** properties, TableProperties** properties,
bool compression_type_missing, bool compression_type_missing,
MemoryAllocator* memory_allocator) { MemoryAllocator* memory_allocator,
FilePrefetchBuffer* prefetch_buffer) {
// -- Read metaindex block // -- Read metaindex block
Footer footer; Footer footer;
auto s = ReadFooterFromFile(file, nullptr /* prefetch_buffer */, file_size, auto s = ReadFooterFromFile(file, prefetch_buffer, file_size, &footer,
&footer, table_magic_number); table_magic_number);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -374,8 +375,8 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
PersistentCacheOptions cache_options; PersistentCacheOptions cache_options;
BlockFetcher block_fetcher( BlockFetcher block_fetcher(
file, nullptr /* prefetch_buffer */, footer, read_options, file, prefetch_buffer, footer, read_options, metaindex_handle,
metaindex_handle, &metaindex_contents, ioptions, false /* decompress */, &metaindex_contents, ioptions, false /* decompress */,
false /*maybe_compressed*/, BlockType::kMetaIndex, false /*maybe_compressed*/, BlockType::kMetaIndex,
UncompressionDict::GetEmptyDict(), cache_options, memory_allocator); UncompressionDict::GetEmptyDict(), cache_options, memory_allocator);
s = block_fetcher.ReadBlockContents(); s = block_fetcher.ReadBlockContents();
@ -398,10 +399,10 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
TableProperties table_properties; TableProperties table_properties;
if (found_properties_block == true) { if (found_properties_block == true) {
s = ReadProperties( s = ReadProperties(meta_iter->value(), file, prefetch_buffer, footer,
meta_iter->value(), file, nullptr /* prefetch_buffer */, footer,
ioptions, properties, false /* verify_checksum */, ioptions, properties, false /* verify_checksum */,
nullptr /* ret_block_hanel */, nullptr /* ret_block_contents */, nullptr /* ret_block_hanel */,
nullptr /* ret_block_contents */,
compression_type_missing, memory_allocator); compression_type_missing, memory_allocator);
} else { } else {
s = Status::NotFound(); s = Status::NotFound();

@ -121,7 +121,8 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
const ImmutableCFOptions& ioptions, const ImmutableCFOptions& ioptions,
TableProperties** properties, TableProperties** properties,
bool compression_type_missing = false, bool compression_type_missing = false,
MemoryAllocator* memory_allocator = nullptr); MemoryAllocator* memory_allocator = nullptr,
FilePrefetchBuffer* prefetch_buffer = nullptr);
// Find the meta block from the meta index block. // Find the meta block from the meta index block.
Status FindMetaBlock(InternalIterator* meta_index_iter, Status FindMetaBlock(InternalIterator* meta_index_iter,

@ -33,19 +33,20 @@ struct TableReaderOptions {
const EnvOptions& _env_options, const EnvOptions& _env_options,
const InternalKeyComparator& _internal_comparator, const InternalKeyComparator& _internal_comparator,
bool _skip_filters = false, bool _immortal = false, bool _skip_filters = false, bool _immortal = false,
int _level = -1, bool _force_direct_prefetch = false, int _level = -1,
BlockCacheTracer* const _block_cache_tracer = nullptr) BlockCacheTracer* const _block_cache_tracer = nullptr)
: TableReaderOptions(_ioptions, _prefix_extractor, _env_options, : TableReaderOptions(_ioptions, _prefix_extractor, _env_options,
_internal_comparator, _skip_filters, _immortal, _internal_comparator, _skip_filters, _immortal,
_level, 0 /* _largest_seqno */, _force_direct_prefetch, _level,
_block_cache_tracer) {} 0 /* _largest_seqno */, _block_cache_tracer) {}
// @param skip_filters Disables loading/accessing the filter block // @param skip_filters Disables loading/accessing the filter block
TableReaderOptions(const ImmutableCFOptions& _ioptions, TableReaderOptions(const ImmutableCFOptions& _ioptions,
const SliceTransform* _prefix_extractor, const SliceTransform* _prefix_extractor,
const EnvOptions& _env_options, const EnvOptions& _env_options,
const InternalKeyComparator& _internal_comparator, const InternalKeyComparator& _internal_comparator,
bool _skip_filters, bool _immortal, int _level, bool _skip_filters, bool _immortal,
bool _force_direct_prefetch, int _level,
SequenceNumber _largest_seqno, SequenceNumber _largest_seqno,
BlockCacheTracer* const _block_cache_tracer) BlockCacheTracer* const _block_cache_tracer)
: ioptions(_ioptions), : ioptions(_ioptions),
@ -54,6 +55,7 @@ struct TableReaderOptions {
internal_comparator(_internal_comparator), internal_comparator(_internal_comparator),
skip_filters(_skip_filters), skip_filters(_skip_filters),
immortal(_immortal), immortal(_immortal),
force_direct_prefetch(_force_direct_prefetch),
level(_level), level(_level),
largest_seqno(_largest_seqno), largest_seqno(_largest_seqno),
block_cache_tracer(_block_cache_tracer) {} block_cache_tracer(_block_cache_tracer) {}
@ -66,6 +68,10 @@ struct TableReaderOptions {
bool skip_filters; bool skip_filters;
// Whether the table will be valid as long as the DB is open // Whether the table will be valid as long as the DB is open
bool immortal; bool immortal;
// When data prefetching is needed, even if direct I/O is off, read data to
// fetch into RocksDB's buffer, rather than relying
// RandomAccessFile::Prefetch().
bool force_direct_prefetch;
// what level this table/file is on, -1 for "not set, don't know" // what level this table/file is on, -1 for "not set, don't know"
int level; int level;
// largest seqno in the table // largest seqno in the table

@ -378,7 +378,7 @@ class TableConstructor: public Constructor {
return ioptions.table_factory->NewTableReader( return ioptions.table_factory->NewTableReader(
TableReaderOptions(ioptions, moptions.prefix_extractor.get(), soptions, TableReaderOptions(ioptions, moptions.prefix_extractor.get(), soptions,
internal_comparator, !kSkipFilters, !kImmortal, internal_comparator, !kSkipFilters, !kImmortal,
level_, largest_seqno_, &block_cache_tracer_), false, level_, largest_seqno_, &block_cache_tracer_),
std::move(file_reader_), TEST_GetSink()->contents().size(), std::move(file_reader_), TEST_GetSink()->contents().size(),
&table_reader_); &table_reader_);
} }

@ -3081,7 +3081,8 @@ void DumpSstFile(Options options, std::string filename, bool output_hex,
// no verification // no verification
// TODO: add support for decoding blob indexes in ldb as well // TODO: add support for decoding blob indexes in ldb as well
ROCKSDB_NAMESPACE::SstFileDumper dumper( ROCKSDB_NAMESPACE::SstFileDumper dumper(
options, filename, /* verify_checksum */ false, output_hex, options, filename, 2 * 1024 * 1024 /* readahead_size */,
/* verify_checksum */ false, output_hex,
/* decode_blob_index */ false); /* decode_blob_index */ false);
Status st = dumper.ReadSequential(true, std::numeric_limits<uint64_t>::max(), Status st = dumper.ReadSequential(true, std::numeric_limits<uint64_t>::max(),
false, // has_from false, // has_from

@ -22,7 +22,7 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
const uint32_t optLength = 100; const uint32_t kOptLength = 100;
namespace { namespace {
static std::string MakeKey(int i) { static std::string MakeKey(int i) {
@ -121,11 +121,11 @@ class SSTDumpToolTest : public testing::Test {
void PopulateCommandArgs(const std::string& file_path, const char* command, void PopulateCommandArgs(const std::string& file_path, const char* command,
char* (&usage)[N]) const { char* (&usage)[N]) const {
for (int i = 0; i < static_cast<int>(N); ++i) { for (int i = 0; i < static_cast<int>(N); ++i) {
usage[i] = new char[optLength]; usage[i] = new char[kOptLength];
} }
snprintf(usage[0], optLength, "./sst_dump"); snprintf(usage[0], kOptLength, "./sst_dump");
snprintf(usage[1], optLength, "%s", command); snprintf(usage[1], kOptLength, "%s", command);
snprintf(usage[2], optLength, "--file=%s", file_path.c_str()); snprintf(usage[2], kOptLength, "--file=%s", file_path.c_str());
} }
}; };
@ -254,6 +254,38 @@ TEST_F(SSTDumpToolTest, MemEnv) {
} }
} }
TEST_F(SSTDumpToolTest, ReadaheadSize) {
Options opts;
opts.env = env();
std::string file_path = MakeFilePath("rocksdb_sst_test.sst");
createSST(opts, file_path);
char* usage[4];
PopulateCommandArgs(file_path, "--command=verify", usage);
snprintf(usage[3], kOptLength, "--readahead_size=4000000");
int num_reads = 0;
SyncPoint::GetInstance()->SetCallBack("RandomAccessFileReader::Read",
[&](void*) { num_reads++; });
SyncPoint::GetInstance()->EnableProcessing();
SSTDumpTool tool;
ASSERT_TRUE(!tool.Run(4, usage, opts));
// The file is approximately 10MB. Readahead is 4MB.
// We usually need 3 reads + one metadata read.
// One extra read is needed before opening the file for metadata.
ASSERT_EQ(5, num_reads);
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
cleanup(opts, file_path);
for (int i = 0; i < 4; i++) {
delete[] usage[i];
}
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS

@ -44,17 +44,19 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
SstFileDumper::SstFileDumper(const Options& options, SstFileDumper::SstFileDumper(const Options& options,
const std::string& file_path, bool verify_checksum, const std::string& file_path,
size_t readahead_size, bool verify_checksum,
bool output_hex, bool decode_blob_index) bool output_hex, bool decode_blob_index)
: file_name_(file_path), : file_name_(file_path),
read_num_(0), read_num_(0),
verify_checksum_(verify_checksum),
output_hex_(output_hex), output_hex_(output_hex),
decode_blob_index_(decode_blob_index), decode_blob_index_(decode_blob_index),
options_(options), options_(options),
ioptions_(options_), ioptions_(options_),
moptions_(ColumnFamilyOptions(options_)), moptions_(ColumnFamilyOptions(options_)),
read_options_(verify_checksum, false),
internal_comparator_(BytewiseComparator()) { internal_comparator_(BytewiseComparator()) {
read_options_.readahead_size = readahead_size;
fprintf(stdout, "Process %s\n", file_path.c_str()); fprintf(stdout, "Process %s\n", file_path.c_str());
init_result_ = GetTableReader(file_name_); init_result_ = GetTableReader(file_name_);
} }
@ -96,9 +98,18 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) {
file_.reset(new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file), file_.reset(new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file),
file_path)); file_path));
FilePrefetchBuffer prefetch_buffer(nullptr, 0, 0, true /* enable */,
false /* track_min_offset */);
const uint64_t kSstDumpTailPrefetchSize = 512 * 1024;
uint64_t prefetch_size = (file_size > kSstDumpTailPrefetchSize)
? kSstDumpTailPrefetchSize
: file_size;
uint64_t prefetch_off = file_size - prefetch_size;
prefetch_buffer.Prefetch(file_.get(), prefetch_off,
static_cast<size_t>(prefetch_size));
if (s.ok()) { if (s.ok()) {
s = ReadFooterFromFile(file_.get(), nullptr /* prefetch_buffer */, s = ReadFooterFromFile(file_.get(), &prefetch_buffer, file_size, &footer);
file_size, &footer);
} }
if (s.ok()) { if (s.ok()) {
magic_number = footer.table_magic_number(); magic_number = footer.table_magic_number();
@ -114,7 +125,11 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) {
} }
options_.comparator = &internal_comparator_; options_.comparator = &internal_comparator_;
// For old sst format, ReadTableProperties might fail but file can be read // For old sst format, ReadTableProperties might fail but file can be read
if (ReadTableProperties(magic_number, file_.get(), file_size).ok()) { if (ReadTableProperties(magic_number, file_.get(), file_size,
(magic_number == kBlockBasedTableMagicNumber)
? &prefetch_buffer
: nullptr)
.ok()) {
SetTableOptionsByMagicNumber(magic_number); SetTableOptionsByMagicNumber(magic_number);
} else { } else {
SetOldTableOptions(); SetOldTableOptions();
@ -132,8 +147,10 @@ Status SstFileDumper::NewTableReader(
const ImmutableCFOptions& /*ioptions*/, const EnvOptions& /*soptions*/, const ImmutableCFOptions& /*ioptions*/, const EnvOptions& /*soptions*/,
const InternalKeyComparator& /*internal_comparator*/, uint64_t file_size, const InternalKeyComparator& /*internal_comparator*/, uint64_t file_size,
std::unique_ptr<TableReader>* /*table_reader*/) { std::unique_ptr<TableReader>* /*table_reader*/) {
auto t_opt = TableReaderOptions(ioptions_, moptions_.prefix_extractor.get(), auto t_opt =
soptions_, internal_comparator_); TableReaderOptions(ioptions_, moptions_.prefix_extractor.get(), soptions_,
internal_comparator_, false /* skip_filters */,
false /* imortal */, true /* force_direct_prefetch */);
// Allow open file with global sequence number for backward compatibility. // Allow open file with global sequence number for backward compatibility.
t_opt.largest_seqno = kMaxSequenceNumber; t_opt.largest_seqno = kMaxSequenceNumber;
@ -152,7 +169,7 @@ Status SstFileDumper::NewTableReader(
Status SstFileDumper::VerifyChecksum() { Status SstFileDumper::VerifyChecksum() {
// We could pass specific readahead setting into read options if needed. // We could pass specific readahead setting into read options if needed.
return table_reader_->VerifyChecksum(ReadOptions(), return table_reader_->VerifyChecksum(read_options_,
TableReaderCaller::kSSTDumpTool); TableReaderCaller::kSSTDumpTool);
} }
@ -184,7 +201,7 @@ uint64_t SstFileDumper::CalculateCompressedTableSize(
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
dest_writer.get())); dest_writer.get()));
std::unique_ptr<InternalIterator> iter(table_reader_->NewIterator( std::unique_ptr<InternalIterator> iter(table_reader_->NewIterator(
ReadOptions(), moptions_.prefix_extractor.get(), /*arena=*/nullptr, read_options_, moptions_.prefix_extractor.get(), /*arena=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kSSTDumpTool)); /*skip_filters=*/false, TableReaderCaller::kSSTDumpTool));
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
table_builder->Add(iter->key(), iter->value()); table_builder->Add(iter->key(), iter->value());
@ -234,7 +251,6 @@ int SstFileDumper::ShowCompressionSize(
size_t block_size, size_t block_size,
CompressionType compress_type, CompressionType compress_type,
const CompressionOptions& compress_opt) { const CompressionOptions& compress_opt) {
ReadOptions read_options;
Options opts; Options opts;
opts.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); opts.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
opts.statistics->set_stats_level(StatsLevel::kAll); opts.statistics->set_stats_level(StatsLevel::kAll);
@ -301,10 +317,13 @@ int SstFileDumper::ShowCompressionSize(
Status SstFileDumper::ReadTableProperties(uint64_t table_magic_number, Status SstFileDumper::ReadTableProperties(uint64_t table_magic_number,
RandomAccessFileReader* file, RandomAccessFileReader* file,
uint64_t file_size) { uint64_t file_size,
FilePrefetchBuffer* prefetch_buffer) {
TableProperties* table_properties = nullptr; TableProperties* table_properties = nullptr;
Status s = ROCKSDB_NAMESPACE::ReadTableProperties( Status s = ROCKSDB_NAMESPACE::ReadTableProperties(
file, file_size, table_magic_number, ioptions_, &table_properties); file, file_size, table_magic_number, ioptions_, &table_properties,
/* compression_type_missing= */ false,
/* memory_allocator= */ nullptr, prefetch_buffer);
if (s.ok()) { if (s.ok()) {
table_properties_.reset(table_properties); table_properties_.reset(table_properties);
} else { } else {
@ -318,8 +337,16 @@ Status SstFileDumper::SetTableOptionsByMagicNumber(
assert(table_properties_); assert(table_properties_);
if (table_magic_number == kBlockBasedTableMagicNumber || if (table_magic_number == kBlockBasedTableMagicNumber ||
table_magic_number == kLegacyBlockBasedTableMagicNumber) { table_magic_number == kLegacyBlockBasedTableMagicNumber) {
options_.table_factory = std::make_shared<BlockBasedTableFactory>(); BlockBasedTableFactory* bbtf = new BlockBasedTableFactory();
// To force tail prefetching, we fake reporting two useful reads of 512KB
// from the tail.
// It needs at least two data points to warm up the stats.
bbtf->tail_prefetch_stats()->RecordEffectiveSize(512 * 1024);
bbtf->tail_prefetch_stats()->RecordEffectiveSize(512 * 1024);
options_.table_factory.reset(bbtf);
fprintf(stdout, "Sst file format: block-based\n"); fprintf(stdout, "Sst file format: block-based\n");
auto& props = table_properties_->user_collected_properties; auto& props = table_properties_->user_collected_properties;
auto pos = props.find(BlockBasedTablePropertyNames::kIndexType); auto pos = props.find(BlockBasedTablePropertyNames::kIndexType);
if (pos != props.end()) { if (pos != props.end()) {
@ -373,7 +400,7 @@ Status SstFileDumper::ReadSequential(bool print_kv, uint64_t read_num,
} }
InternalIterator* iter = table_reader_->NewIterator( InternalIterator* iter = table_reader_->NewIterator(
ReadOptions(verify_checksum_, false), moptions_.prefix_extractor.get(), read_options_, moptions_.prefix_extractor.get(),
/*arena=*/nullptr, /*skip_filters=*/false, /*arena=*/nullptr, /*skip_filters=*/false,
TableReaderCaller::kSSTDumpTool); TableReaderCaller::kSSTDumpTool);
uint64_t i = 0; uint64_t i = 0;
@ -518,6 +545,24 @@ void print_help() {
)"); )");
} }
// arg_name would include all prefix, e.g. "--my_arg="
// arg_val is the parses value.
// True if there is a match. False otherwise.
// Woud exit after printing errmsg if cannot be parsed.
bool ParseIntArg(const char* arg, const std::string arg_name,
const std::string err_msg, int64_t* arg_val) {
if (strncmp(arg, arg_name.c_str(), arg_name.size()) == 0) {
std::string input_str = arg + arg_name.size();
std::istringstream iss(input_str);
iss >> *arg_val;
if (iss.fail()) {
fprintf(stderr, "%s\n", err_msg.c_str());
exit(1);
}
return true;
}
return false;
}
} // namespace } // namespace
int SSTDumpTool::Run(int argc, char** argv, Options options) { int SSTDumpTool::Run(int argc, char** argv, Options options) {
@ -547,6 +592,7 @@ int SSTDumpTool::Run(int argc, char** argv, Options options) {
std::string compression_level_from_str; std::string compression_level_from_str;
std::string compression_level_to_str; std::string compression_level_to_str;
size_t block_size = 0; size_t block_size = 0;
size_t readahead_size = 2 * 1024 * 1024;
std::vector<std::pair<CompressionType, const char*>> compression_types; std::vector<std::pair<CompressionType, const char*>> compression_types;
uint64_t total_num_files = 0; uint64_t total_num_files = 0;
uint64_t total_num_data_blocks = 0; uint64_t total_num_data_blocks = 0;
@ -555,6 +601,9 @@ int SSTDumpTool::Run(int argc, char** argv, Options options) {
uint64_t total_filter_block_size = 0; uint64_t total_filter_block_size = 0;
int32_t compress_level_from = CompressionOptions::kDefaultCompressionLevel; int32_t compress_level_from = CompressionOptions::kDefaultCompressionLevel;
int32_t compress_level_to = CompressionOptions::kDefaultCompressionLevel; int32_t compress_level_to = CompressionOptions::kDefaultCompressionLevel;
int64_t tmp_val;
for (int i = 1; i < argc; i++) { for (int i = 1; i < argc; i++) {
if (strncmp(argv[i], "--env_uri=", 10) == 0) { if (strncmp(argv[i], "--env_uri=", 10) == 0) {
env_uri = argv[i] + 10; env_uri = argv[i] + 10;
@ -586,15 +635,13 @@ int SSTDumpTool::Run(int argc, char** argv, Options options) {
show_properties = true; show_properties = true;
} else if (strcmp(argv[i], "--show_summary") == 0) { } else if (strcmp(argv[i], "--show_summary") == 0) {
show_summary = true; show_summary = true;
} else if (strncmp(argv[i], "--set_block_size=", 17) == 0) { } else if (ParseIntArg(argv[i], "--set_block_size=",
"block size must be numeric", &tmp_val)) {
set_block_size = true; set_block_size = true;
block_size_str = argv[i] + 17; block_size = static_cast<size_t>(tmp_val);
std::istringstream iss(block_size_str); } else if (ParseIntArg(argv[i], "--readahead_size=",
iss >> block_size; "readahead_size must be numeric", &tmp_val)) {
if (iss.fail()) { readahead_size = static_cast<size_t>(tmp_val);
fprintf(stderr, "block size must be numeric\n");
exit(1);
}
} else if (strncmp(argv[i], "--compression_types=", 20) == 0) { } else if (strncmp(argv[i], "--compression_types=", 20) == 0) {
std::string compression_types_csv = argv[i] + 20; std::string compression_types_csv = argv[i] + 20;
std::istringstream iss(compression_types_csv); std::istringstream iss(compression_types_csv);
@ -633,25 +680,16 @@ int SSTDumpTool::Run(int argc, char** argv, Options options) {
} }
fprintf(stdout, "key=%s\n", ikey.DebugString(true).c_str()); fprintf(stdout, "key=%s\n", ikey.DebugString(true).c_str());
return retc; return retc;
} else if (strncmp(argv[i], "--compression_level_from=", 25) == 0) { } else if (ParseIntArg(argv[i], "--compression_level_from=",
compression_level_from_str = argv[i] + 25; "compression_level_from must be numeric",
&tmp_val)) {
has_compression_level_from = true; has_compression_level_from = true;
std::istringstream iss(compression_level_from_str); compress_level_from = static_cast<int>(tmp_val);
iss >> compress_level_from; } else if (ParseIntArg(argv[i], "--compression_level_to=",
if (iss.fail()) { "compression_level_to must be numeric", &tmp_val)) {
fprintf(stderr, "compression_level_from must be numeric\n");
exit(1);
}
} else if (strncmp(argv[i], "--compression_level_to=", 22) == 0) {
compression_level_to_str = argv[i]+23 ;
has_compression_level_to = true; has_compression_level_to = true;
std::istringstream iss(compression_level_to_str); compress_level_to = static_cast<int>(tmp_val);
iss >> compress_level_to; } else {
if (iss.fail()) {
fprintf(stderr, "compression_level_to must be numeric\n");
exit(1);
}
}else {
fprintf(stderr, "Unrecognized argument '%s'\n\n", argv[i]); fprintf(stderr, "Unrecognized argument '%s'\n\n", argv[i]);
print_help(); print_help();
exit(1); exit(1);
@ -732,8 +770,9 @@ int SSTDumpTool::Run(int argc, char** argv, Options options) {
filename = std::string(dir_or_file) + "/" + filename; filename = std::string(dir_or_file) + "/" + filename;
} }
ROCKSDB_NAMESPACE::SstFileDumper dumper(options, filename, verify_checksum, ROCKSDB_NAMESPACE::SstFileDumper dumper(options, filename, readahead_size,
output_hex, decode_blob_index); verify_checksum, output_hex,
decode_blob_index);
if (!dumper.getStatus().ok()) { if (!dumper.getStatus().ok()) {
fprintf(stderr, "%s: %s\n", filename.c_str(), fprintf(stderr, "%s: %s\n", filename.c_str(),
dumper.getStatus().ToString().c_str()); dumper.getStatus().ToString().c_str());

@ -18,8 +18,8 @@ namespace ROCKSDB_NAMESPACE {
class SstFileDumper { class SstFileDumper {
public: public:
explicit SstFileDumper(const Options& options, const std::string& file_name, explicit SstFileDumper(const Options& options, const std::string& file_name,
bool verify_checksum, bool output_hex, size_t readahead_size, bool verify_checksum,
bool decode_blob_index); bool output_hex, bool decode_blob_index);
Status ReadSequential(bool print_kv, uint64_t read_num, bool has_from, Status ReadSequential(bool print_kv, uint64_t read_num, bool has_from,
const std::string& from_key, bool has_to, const std::string& from_key, bool has_to,
@ -51,7 +51,8 @@ class SstFileDumper {
// Get the TableReader implementation for the sst file // Get the TableReader implementation for the sst file
Status GetTableReader(const std::string& file_path); Status GetTableReader(const std::string& file_path);
Status ReadTableProperties(uint64_t table_magic_number, Status ReadTableProperties(uint64_t table_magic_number,
RandomAccessFileReader* file, uint64_t file_size); RandomAccessFileReader* file, uint64_t file_size,
FilePrefetchBuffer* prefetch_buffer);
uint64_t CalculateCompressedTableSize(const TableBuilderOptions& tb_options, uint64_t CalculateCompressedTableSize(const TableBuilderOptions& tb_options,
size_t block_size, size_t block_size,
@ -70,7 +71,6 @@ class SstFileDumper {
std::string file_name_; std::string file_name_;
uint64_t read_num_; uint64_t read_num_;
bool verify_checksum_;
bool output_hex_; bool output_hex_;
bool decode_blob_index_; bool decode_blob_index_;
EnvOptions soptions_; EnvOptions soptions_;
@ -85,6 +85,7 @@ class SstFileDumper {
const ImmutableCFOptions ioptions_; const ImmutableCFOptions ioptions_;
const MutableCFOptions moptions_; const MutableCFOptions moptions_;
ReadOptions read_options_;
InternalKeyComparator internal_comparator_; InternalKeyComparator internal_comparator_;
std::unique_ptr<TableProperties> table_properties_; std::unique_ptr<TableProperties> table_properties_;
}; };

Loading…
Cancel
Save