From 40497a875aa3a73399bb0badb798cdee0c8fbe40 Mon Sep 17 00:00:00 2001 From: Cheng Chang Date: Fri, 24 Apr 2020 15:30:12 -0700 Subject: [PATCH] Reduce memory copies when fetching and uncompressing blocks from SST files (#6689) Summary: In https://github.com/facebook/rocksdb/pull/6455, we modified the interface of `RandomAccessFileReader::Read` to be able to get rid of memcpy in direct IO mode. This PR applies the new interface to `BlockFetcher` when reading blocks from SST files in direct IO mode. Without this PR, in direct IO mode, when fetching and uncompressing compressed blocks, `BlockFetcher` will first copy the raw compressed block into `BlockFetcher::compressed_buf_` or `BlockFetcher::stack_buf_` inside `RandomAccessFileReader::Read` depending on the block size. then during uncompressing, it will copy the uncompressed block into `BlockFetcher::heap_buf_`. In this PR, we get rid of the first memcpy and directly uncompress the block from `direct_io_buf_` to `heap_buf_`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6689 Test Plan: A new unit test `block_fetcher_test` is added. Reviewed By: anand1976 Differential Revision: D21006729 Pulled By: cheng-chang fbshipit-source-id: 2370b92c24075692423b81277415feb2aed5d980 --- CMakeLists.txt | 1 + HISTORY.md | 3 + Makefile | 4 + TARGETS | 7 + file/random_access_file_reader.h | 2 +- file/random_access_file_reader_test.cc | 15 +- options/options_helper.h | 2 + src.mk | 1 + table/block_fetcher.cc | 93 +++-- table/block_fetcher.h | 34 +- table/block_fetcher_test.cc | 475 +++++++++++++++++++++++++ table/format.cc | 2 +- test_util/testutil.cc | 13 + test_util/testutil.h | 5 + 14 files changed, 609 insertions(+), 48 deletions(-) create mode 100644 table/block_fetcher_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index bcb2c0c8f..bf99be5e0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1063,6 +1063,7 @@ if(WITH_TESTS) table/merger_test.cc table/sst_file_reader_test.cc table/table_test.cc + table/block_fetcher_test.cc tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc tools/ldb_cmd_test.cc tools/reduce_levels_test.cc diff --git a/HISTORY.md b/HISTORY.md index d88a0a13c..3bb3f7826 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -19,6 +19,9 @@ * Fix a bug when making options.bottommost_compression, options.compression_opts and options.bottommost_compression_opts dynamically changeable: the modified values are not written to option files or returned back to users when being queried. * Fix a bug where index key comparisons were unaccounted in `PerfContext::user_key_comparison_count` for lookups in files written with `format_version >= 3`. +### Performance Improvements +* Reduced memory copies when fetching and uncompressing compressed blocks from sst files. + ## 6.9.0 (03/29/2020) ### Behavior changes * Since RocksDB 6.8, ttl-based FIFO compaction can drop a file whose oldest key becomes older than options.ttl while others have not. This fix reverts this and makes ttl-based FIFO compaction use the file's flush time as the criterion. This fix also requires that max_open_files = -1 and compaction_options_fifo.allow_compaction = false to function properly. diff --git a/Makefile b/Makefile index a258819f8..616f37d08 100644 --- a/Makefile +++ b/Makefile @@ -566,6 +566,7 @@ TESTS = \ deletefile_test \ obsolete_files_test \ table_test \ + block_fetcher_test \ delete_scheduler_test \ options_test \ options_settable_test \ @@ -1551,6 +1552,9 @@ cleanable_test: table/cleanable_test.o $(LIBOBJECTS) $(TESTHARNESS) table_test: table/table_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +block_fetcher_test: table/block_fetcher_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + block_test: table/block_based/block_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index f9b998c05..d247448ee 100644 --- a/TARGETS +++ b/TARGETS @@ -528,6 +528,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "block_fetcher_test", + "table/block_fetcher_test.cc", + "serial", + [], + [], + ], [ "block_test", "table/block_based/block_test.cc", diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index e97cd8c2a..1cdc388fb 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -22,7 +22,7 @@ namespace ROCKSDB_NAMESPACE { class Statistics; class HistogramImpl; -using AlignedBuf = std::unique_ptr; +using AlignedBuf = std::unique_ptr; // RandomAccessFileReader is a wrapper on top of Env::RnadomAccessFile. It is // responsible for: diff --git a/file/random_access_file_reader_test.cc b/file/random_access_file_reader_test.cc index 04b135693..03f9da59d 100644 --- a/file/random_access_file_reader_test.cc +++ b/file/random_access_file_reader_test.cc @@ -15,20 +15,7 @@ namespace ROCKSDB_NAMESPACE { class RandomAccessFileReaderTest : public testing::Test { public: void SetUp() override { -#ifdef OS_LINUX - // TEST_TMPDIR may be set to /dev/shm in Makefile, - // but /dev/shm does not support direct IO. - // The default TEST_TMPDIR is under /tmp, but /tmp might also be a tmpfs - // which does not support direct IO neither. - unsetenv("TEST_TMPDIR"); - char* tmpdir = getenv("DISK_TEMP_DIR"); - if (tmpdir == nullptr) { - tmpdir = getenv("HOME"); - } - if (tmpdir != nullptr) { - setenv("TEST_TMPDIR", tmpdir, 1); - } -#endif + test::ResetTmpDirForDirectIO(); env_ = Env::Default(); fs_ = FileSystem::Default(); test_dir_ = test::PerThreadDBPath("random_access_file_reader_test"); diff --git a/options/options_helper.h b/options/options_helper.h index 399ba1034..f5edc1b84 100644 --- a/options/options_helper.h +++ b/options/options_helper.h @@ -21,6 +21,8 @@ namespace ROCKSDB_NAMESPACE { struct ConfigOptions; +std::vector GetSupportedCompressions(); + DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, const MutableDBOptions& mutable_db_options); diff --git a/src.mk b/src.mk index a17e72949..5fc865607 100644 --- a/src.mk +++ b/src.mk @@ -426,6 +426,7 @@ MAIN_SOURCES = \ table/sst_file_reader_test.cc \ table/table_reader_bench.cc \ table/table_test.cc \ + table/block_fetcher_test.cc \ third-party/gtest-1.7.0/fused-src/gtest/gtest-all.cc \ tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc \ tools/block_cache_analyzer/block_cache_trace_analyzer_tool.cc \ diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index d1b09a53d..61c80cb9e 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -90,10 +90,8 @@ inline bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() { inline bool BlockFetcher::TryGetFromPrefetchBuffer() { if (prefetch_buffer_ != nullptr && prefetch_buffer_->TryReadFromCache( - handle_.offset(), - static_cast(handle_.size()) + kBlockTrailerSize, &slice_, + handle_.offset(), block_size_with_trailer_, &slice_, for_compaction_)) { - block_size_ = static_cast(handle_.size()); CheckBlockChecksum(); if (!status_.ok()) { return true; @@ -110,7 +108,7 @@ inline bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() { // lookup uncompressed cache mode p-cache std::unique_ptr raw_data; status_ = PersistentCacheHelper::LookupRawPage( - cache_options_, handle_, &raw_data, block_size_ + kBlockTrailerSize); + cache_options_, handle_, &raw_data, block_size_with_trailer_); if (status_.ok()) { heap_buf_ = CacheAllocationPtr(raw_data.release()); used_buf_ = heap_buf_.get(); @@ -129,17 +127,17 @@ inline bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() { inline void BlockFetcher::PrepareBufferForBlockFromFile() { // cache miss read from device if (do_uncompress_ && - block_size_ + kBlockTrailerSize < kDefaultStackBufferSize) { + block_size_with_trailer_ < kDefaultStackBufferSize) { // If we've got a small enough hunk of data, read it in to the // trivially allocated stack buffer instead of needing a full malloc() used_buf_ = &stack_buf_[0]; } else if (maybe_compressed_ && !do_uncompress_) { - compressed_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, + compressed_buf_ = AllocateBlock(block_size_with_trailer_, memory_allocator_compressed_); used_buf_ = compressed_buf_.get(); } else { heap_buf_ = - AllocateBlock(block_size_ + kBlockTrailerSize, memory_allocator_); + AllocateBlock(block_size_with_trailer_, memory_allocator_); used_buf_ = heap_buf_.get(); } } @@ -150,7 +148,7 @@ inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() { cache_options_.persistent_cache->IsCompressed()) { // insert to raw cache PersistentCacheHelper::InsertRawPage(cache_options_, handle_, used_buf_, - block_size_ + kBlockTrailerSize); + block_size_with_trailer_); } } @@ -164,12 +162,35 @@ inline void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() { } } -inline void BlockFetcher::CopyBufferToHeap() { +inline void BlockFetcher::CopyBufferToHeapBuf() { assert(used_buf_ != heap_buf_.get()); - heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, memory_allocator_); - memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize); + heap_buf_ = AllocateBlock(block_size_with_trailer_, memory_allocator_); + memcpy(heap_buf_.get(), used_buf_, block_size_with_trailer_); +#ifndef NDEBUG + num_heap_buf_memcpy_++; +#endif +} + +inline void BlockFetcher::CopyBufferToCompressedBuf() { + assert(used_buf_ != compressed_buf_.get()); + compressed_buf_ = AllocateBlock(block_size_with_trailer_, + memory_allocator_compressed_); + memcpy(compressed_buf_.get(), used_buf_, block_size_with_trailer_); +#ifndef NDEBUG + num_compressed_buf_memcpy_++; +#endif } +// Entering this method means the block is not compressed or do not need to be +// uncompressed. The block can be in one of the following buffers: +// 1. prefetch buffer if prefetch is enabled and the block is prefetched before +// 2. stack_buf_ if block size is smaller than the stack_buf_ size and block +// is not compressed +// 3. heap_buf_ if the block is not compressed +// 4. compressed_buf_ if the block is compressed +// 5. direct_io_buf_ if direct IO is enabled +// After this method, if the block is compressed, it should be in +// compressed_buf_, otherwise should be in heap_buf_. inline void BlockFetcher::GetBlockContents() { if (slice_.data() != used_buf_) { // the slice content is not the buffer provided @@ -178,14 +199,21 @@ inline void BlockFetcher::GetBlockContents() { // page can be either uncompressed or compressed, the buffer either stack // or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096 if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) { - CopyBufferToHeap(); + CopyBufferToHeapBuf(); } else if (used_buf_ == compressed_buf_.get()) { if (compression_type_ == kNoCompression && memory_allocator_ != memory_allocator_compressed_) { - CopyBufferToHeap(); + CopyBufferToHeapBuf(); } else { heap_buf_ = std::move(compressed_buf_); } + } else if (direct_io_buf_.get() != nullptr) { + if (compression_type_ == kNoCompression) { + CopyBufferToHeapBuf(); + } else { + CopyBufferToCompressedBuf(); + heap_buf_ = std::move(compressed_buf_); + } } *contents_ = BlockContents(std::move(heap_buf_), block_size_); } @@ -195,8 +223,6 @@ inline void BlockFetcher::GetBlockContents() { } Status BlockFetcher::ReadBlockContents() { - block_size_ = static_cast(handle_.size()); - if (TryGetUncompressBlockFromPersistentCache()) { compression_type_ = kNoCompression; #ifndef NDEBUG @@ -209,16 +235,30 @@ Status BlockFetcher::ReadBlockContents() { return status_; } } else if (!TryGetCompressedBlockFromPersistentCache()) { - PrepareBufferForBlockFromFile(); - Status s; - - { - PERF_TIMER_GUARD(block_read_time); // Actual file read - status_ = file_->Read(handle_.offset(), block_size_ + kBlockTrailerSize, + if (file_->use_direct_io()) { + PERF_TIMER_GUARD(block_read_time); + status_ = + file_->Read(handle_.offset(), block_size_with_trailer_, + &slice_, nullptr, &direct_io_buf_, for_compaction_); + PERF_COUNTER_ADD(block_read_count, 1); + used_buf_ = const_cast(slice_.data()); + } else { + PrepareBufferForBlockFromFile(); + PERF_TIMER_GUARD(block_read_time); + status_ = file_->Read(handle_.offset(), block_size_with_trailer_, &slice_, used_buf_, nullptr, for_compaction_); + PERF_COUNTER_ADD(block_read_count, 1); +#ifndef NDEBUG + if (used_buf_ == &stack_buf_[0]) { + num_stack_buf_memcpy_++; + } else if (used_buf_ == heap_buf_.get()) { + num_heap_buf_memcpy_++; + } else if (used_buf_ == compressed_buf_.get()) { + num_compressed_buf_memcpy_++; + } +#endif } - PERF_COUNTER_ADD(block_read_count, 1); // TODO: introduce dedicated perf counter for range tombstones switch (block_type_) { @@ -239,16 +279,16 @@ Status BlockFetcher::ReadBlockContents() { break; } - PERF_COUNTER_ADD(block_read_byte, block_size_ + kBlockTrailerSize); + PERF_COUNTER_ADD(block_read_byte, block_size_with_trailer_); if (!status_.ok()) { return status_; } - if (slice_.size() != block_size_ + kBlockTrailerSize) { + if (slice_.size() != block_size_with_trailer_) { return Status::Corruption("truncated block read from " + file_->file_name() + " offset " + ToString(handle_.offset()) + ", expected " + - ToString(block_size_ + kBlockTrailerSize) + + ToString(block_size_with_trailer_) + " bytes, got " + ToString(slice_.size())); } @@ -270,6 +310,9 @@ Status BlockFetcher::ReadBlockContents() { status_ = UncompressBlockContents(info, slice_.data(), block_size_, contents_, footer_.version(), ioptions_, memory_allocator_); +#ifndef NDEBUG + num_heap_buf_memcpy_++; +#endif compression_type_ = kNoCompression; } else { GetBlockContents(); diff --git a/table/block_fetcher.h b/table/block_fetcher.h index 1b003df15..c03352e98 100644 --- a/table/block_fetcher.h +++ b/table/block_fetcher.h @@ -56,6 +56,8 @@ class BlockFetcher { do_uncompress_(do_uncompress), maybe_compressed_(maybe_compressed), block_type_(block_type), + block_size_(static_cast(handle_.size())), + block_size_with_trailer_(block_size(handle_)), uncompression_dict_(uncompression_dict), cache_options_(cache_options), memory_allocator_(memory_allocator), @@ -65,7 +67,21 @@ class BlockFetcher { Status ReadBlockContents(); CompressionType get_compression_type() const { return compression_type_; } +#ifndef NDEBUG + int TEST_GetNumStackBufMemcpy() const { return num_stack_buf_memcpy_; } + int TEST_GetNumHeapBufMemcpy() const { return num_heap_buf_memcpy_; } + int TEST_GetNumCompressedBufMemcpy() const { + return num_compressed_buf_memcpy_; + } + +#endif private: +#ifndef NDEBUG + int num_stack_buf_memcpy_ = 0; + int num_heap_buf_memcpy_ = 0; + int num_compressed_buf_memcpy_ = 0; + +#endif static const uint32_t kDefaultStackBufferSize = 5000; RandomAccessFileReader* file_; @@ -75,9 +91,11 @@ class BlockFetcher { const BlockHandle& handle_; BlockContents* contents_; const ImmutableCFOptions& ioptions_; - bool do_uncompress_; - bool maybe_compressed_; - BlockType block_type_; + const bool do_uncompress_; + const bool maybe_compressed_; + const BlockType block_type_; + const size_t block_size_; + const size_t block_size_with_trailer_; const UncompressionDict& uncompression_dict_; const PersistentCacheOptions& cache_options_; MemoryAllocator* memory_allocator_; @@ -85,12 +103,12 @@ class BlockFetcher { Status status_; Slice slice_; char* used_buf_ = nullptr; - size_t block_size_; + AlignedBuf direct_io_buf_; CacheAllocationPtr heap_buf_; CacheAllocationPtr compressed_buf_; char stack_buf_[kDefaultStackBufferSize]; bool got_from_prefetch_buffer_ = false; - ROCKSDB_NAMESPACE::CompressionType compression_type_; + CompressionType compression_type_; bool for_compaction_ = false; // return true if found @@ -99,8 +117,10 @@ class BlockFetcher { bool TryGetFromPrefetchBuffer(); bool TryGetCompressedBlockFromPersistentCache(); void PrepareBufferForBlockFromFile(); - // Copy content from used_buf_ to new heap buffer. - void CopyBufferToHeap(); + // Copy content from used_buf_ to new heap_buf_. + void CopyBufferToHeapBuf(); + // Copy content from used_buf_ to new compressed_buf_. + void CopyBufferToCompressedBuf(); void GetBlockContents(); void InsertCompressedBlockToPersistentCacheIfNeeded(); void InsertUncompressedBlockToPersistentCacheIfNeeded(); diff --git a/table/block_fetcher_test.cc b/table/block_fetcher_test.cc new file mode 100644 index 000000000..64655639f --- /dev/null +++ b/table/block_fetcher_test.cc @@ -0,0 +1,475 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "table/block_fetcher.h" +#include "db/table_properties_collector.h" +#include "options/options_helper.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "table/block_based/binary_search_index_reader.h" +#include "table/block_based/block_based_table_builder.h" +#include "table/block_based/block_based_table_factory.h" +#include "table/block_based/block_based_table_reader.h" +#include "table/format.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" + +namespace ROCKSDB_NAMESPACE { +namespace { + +class CountedMemoryAllocator : public MemoryAllocator { + public: + const char* Name() const override { return "CountedMemoryAllocator"; } + + void* Allocate(size_t size) override { + num_allocations_++; + return static_cast(new char[size]); + } + + void Deallocate(void* p) override { + num_deallocations_++; + delete[] static_cast(p); + } + + int GetNumAllocations() const { return num_allocations_; } + int GetNumDeallocations() const { return num_deallocations_; } + + private: + int num_allocations_ = 0; + int num_deallocations_ = 0; +}; + +struct MemcpyStats { + int num_stack_buf_memcpy = 0; + int num_heap_buf_memcpy = 0; + int num_compressed_buf_memcpy = 0; +}; + +struct BufAllocationStats { + int num_heap_buf_allocations = 0; + int num_compressed_buf_allocations = 0; +}; + +struct TestStats { + MemcpyStats memcpy_stats; + BufAllocationStats buf_allocation_stats; +}; + +class BlockFetcherTest : public testing::Test { + protected: + void SetUp() override { + test::ResetTmpDirForDirectIO(); + test_dir_ = test::PerThreadDBPath("block_fetcher_test"); + env_ = Env::Default(); + fs_ = FileSystem::Default(); + ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr)); + is_direct_io_supported_ = DetectDirectIOSupport(); + } + + void TearDown() override { EXPECT_OK(test::DestroyDir(env_, test_dir_)); } + + bool IsDirectIOSupported() const { return is_direct_io_supported_; } + + void AssertSameBlock(const BlockContents& block1, + const BlockContents& block2) { + ASSERT_EQ(block1.data.ToString(), block2.data.ToString()); + } + + // Creates a table with kv pairs (i, i) where i ranges from 0 to 9, inclusive. + void CreateTable(const std::string& table_name, + const CompressionType& compression_type) { + std::unique_ptr writer; + NewFileWriter(table_name, &writer); + + // Create table builder. + Options options; + ImmutableCFOptions ioptions(options); + InternalKeyComparator comparator(options.comparator); + ColumnFamilyOptions cf_options; + MutableCFOptions moptions(cf_options); + std::vector> factories; + std::unique_ptr table_builder(table_factory_.NewTableBuilder( + TableBuilderOptions(ioptions, moptions, comparator, &factories, + compression_type, 0 /* sample_for_compression */, + CompressionOptions(), false /* skip_filters */, + kDefaultColumnFamilyName, -1 /* level */), + 0 /* column_family_id */, writer.get())); + + // Build table. + for (int i = 0; i < 9; i++) { + std::string key = ToInternalKey(std::to_string(i)); + std::string value = std::to_string(i); + table_builder->Add(key, value); + } + ASSERT_OK(table_builder->Finish()); + } + + void FetchIndexBlock(const std::string& table_name, bool use_direct_io, + CountedMemoryAllocator* heap_buf_allocator, + CountedMemoryAllocator* compressed_buf_allocator, + MemcpyStats* memcpy_stats, BlockContents* index_block) { + FileOptions fopt; + fopt.use_direct_reads = use_direct_io; + std::unique_ptr file; + NewFileReader(table_name, fopt, &file); + + // Get handle of the index block. + Footer footer; + ReadFooter(file.get(), &footer); + const BlockHandle& index_handle = footer.index_handle(); + + CompressionType compression_type; + FetchBlock(file.get(), index_handle, BlockType::kIndex, + false /* compressed */, false /* do_uncompress */, + heap_buf_allocator, compressed_buf_allocator, index_block, + memcpy_stats, &compression_type); + ASSERT_EQ(compression_type, CompressionType::kNoCompression); + } + + // Fetches the first data block in both direct IO and non-direct IO mode. + // + // compressed: whether the data blocks are compressed; + // do_uncompress: whether the data blocks should be uncompressed on fetching. + // compression_type: the expected compression type. + // + // Expects: + // Block contents are the same. + // Bufferr allocation and memory copy statistics are expected. + void TestFetchDataBlock(const std::string& table_name_prefix, bool compressed, + bool do_uncompress, + const TestStats& expected_non_direct_io_stats, + const TestStats& expected_direct_io_stats) { + if (!IsDirectIOSupported()) { + printf("Skip this test since direct IO is not supported\n"); + return; + } + + for (CompressionType compression_type : GetSupportedCompressions()) { + bool do_compress = compression_type != kNoCompression; + if (compressed != do_compress) continue; + std::string compression_type_str = + CompressionTypeToString(compression_type); + + std::string table_name = table_name_prefix + compression_type_str; + CreateTable(table_name, compression_type); + + CompressionType expected_compression_type_after_fetch = + (compressed && !do_uncompress) ? compression_type : kNoCompression; + + BlockContents blocks[2]; + MemcpyStats memcpy_stats[2]; + CountedMemoryAllocator heap_buf_allocators[2]; + CountedMemoryAllocator compressed_buf_allocators[2]; + for (bool use_direct_io : {false, true}) { + FetchFirstDataBlock( + table_name, use_direct_io, compressed, do_uncompress, + expected_compression_type_after_fetch, + &heap_buf_allocators[use_direct_io], + &compressed_buf_allocators[use_direct_io], &blocks[use_direct_io], + &memcpy_stats[use_direct_io]); + } + + AssertSameBlock(blocks[0], blocks[1]); + + // Check memcpy and buffer allocation statistics. + for (bool use_direct_io : {false, true}) { + const TestStats& expected_stats = use_direct_io + ? expected_direct_io_stats + : expected_non_direct_io_stats; + + ASSERT_EQ(memcpy_stats[use_direct_io].num_stack_buf_memcpy, + expected_stats.memcpy_stats.num_stack_buf_memcpy); + ASSERT_EQ(memcpy_stats[use_direct_io].num_heap_buf_memcpy, + expected_stats.memcpy_stats.num_heap_buf_memcpy); + ASSERT_EQ(memcpy_stats[use_direct_io].num_compressed_buf_memcpy, + expected_stats.memcpy_stats.num_compressed_buf_memcpy); + + ASSERT_EQ(heap_buf_allocators[use_direct_io].GetNumAllocations(), + expected_stats.buf_allocation_stats.num_heap_buf_allocations); + ASSERT_EQ( + compressed_buf_allocators[use_direct_io].GetNumAllocations(), + expected_stats.buf_allocation_stats.num_compressed_buf_allocations); + + // The allocated buffers are not deallocated until + // the block content is deleted. + ASSERT_EQ(heap_buf_allocators[use_direct_io].GetNumDeallocations(), 0); + ASSERT_EQ( + compressed_buf_allocators[use_direct_io].GetNumDeallocations(), 0); + blocks[use_direct_io].allocation.reset(); + ASSERT_EQ(heap_buf_allocators[use_direct_io].GetNumDeallocations(), + expected_stats.buf_allocation_stats.num_heap_buf_allocations); + ASSERT_EQ( + compressed_buf_allocators[use_direct_io].GetNumDeallocations(), + expected_stats.buf_allocation_stats.num_compressed_buf_allocations); + } + } + } + + private: + std::string test_dir_; + Env* env_; + std::shared_ptr fs_; + BlockBasedTableFactory table_factory_; + bool is_direct_io_supported_; + + std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; } + + void WriteToFile(const std::string& content, const std::string& filename) { + std::unique_ptr f; + ASSERT_OK(fs_->NewWritableFile(Path(filename), FileOptions(), &f, nullptr)); + ASSERT_OK(f->Append(content, IOOptions(), nullptr)); + ASSERT_OK(f->Close(IOOptions(), nullptr)); + } + + bool DetectDirectIOSupport() { + WriteToFile("", ".direct"); + FileOptions opt; + opt.use_direct_reads = true; + std::unique_ptr f; + auto s = fs_->NewRandomAccessFile(Path(".direct"), opt, &f, nullptr); + return s.ok(); + } + + void NewFileWriter(const std::string& filename, + std::unique_ptr* writer) { + std::string path = Path(filename); + EnvOptions env_options; + std::unique_ptr file; + ASSERT_OK(env_->NewWritableFile(path, &file, env_options)); + writer->reset(new WritableFileWriter( + NewLegacyWritableFileWrapper(std::move(file)), path, env_options)); + } + + void NewFileReader(const std::string& filename, const FileOptions& opt, + std::unique_ptr* reader) { + std::string path = Path(filename); + std::unique_ptr f; + ASSERT_OK(fs_->NewRandomAccessFile(path, opt, &f, nullptr)); + reader->reset(new RandomAccessFileReader(std::move(f), path, env_)); + } + + void NewTableReader(const ImmutableCFOptions& ioptions, + const FileOptions& foptions, + const InternalKeyComparator& comparator, + const std::string& table_name, + std::unique_ptr* table) { + std::unique_ptr file; + NewFileReader(table_name, foptions, &file); + + uint64_t file_size = 0; + ASSERT_OK(env_->GetFileSize(Path(table_name), &file_size)); + + std::unique_ptr table_reader; + ASSERT_OK(BlockBasedTable::Open(ioptions, EnvOptions(), + table_factory_.table_options(), comparator, + std::move(file), file_size, &table_reader)); + + table->reset(reinterpret_cast(table_reader.release())); + } + + std::string ToInternalKey(const std::string& key) { + InternalKey internal_key(key, 0, ValueType::kTypeValue); + return internal_key.Encode().ToString(); + } + + void ReadFooter(RandomAccessFileReader* file, Footer* footer) { + uint64_t file_size = 0; + ASSERT_OK(env_->GetFileSize(file->file_name(), &file_size)); + ReadFooterFromFile(file, nullptr /* prefetch_buffer */, file_size, footer, + kBlockBasedTableMagicNumber); + } + + // NOTE: compression_type returns the compression type of the fetched block + // contents, so if the block is fetched and uncompressed, then it's + // kNoCompression. + void FetchBlock(RandomAccessFileReader* file, const BlockHandle& block, + BlockType block_type, bool compressed, bool do_uncompress, + MemoryAllocator* heap_buf_allocator, + MemoryAllocator* compressed_buf_allocator, + BlockContents* contents, MemcpyStats* stats, + CompressionType* compresstion_type) { + Options options; + ImmutableCFOptions ioptions(options); + ReadOptions roptions; + PersistentCacheOptions persistent_cache_options; + Footer footer; + ReadFooter(file, &footer); + std::unique_ptr fetcher(new BlockFetcher( + file, nullptr /* prefetch_buffer */, footer, roptions, block, contents, + ioptions, do_uncompress, compressed, block_type, + UncompressionDict::GetEmptyDict(), persistent_cache_options, + heap_buf_allocator, compressed_buf_allocator)); + + ASSERT_OK(fetcher->ReadBlockContents()); + + stats->num_stack_buf_memcpy = fetcher->TEST_GetNumStackBufMemcpy(); + stats->num_heap_buf_memcpy = fetcher->TEST_GetNumHeapBufMemcpy(); + stats->num_compressed_buf_memcpy = + fetcher->TEST_GetNumCompressedBufMemcpy(); + + *compresstion_type = fetcher->get_compression_type(); + } + + // NOTE: expected_compression_type is the expected compression + // type of the fetched block content, if the block is uncompressed, + // then the expected compression type is kNoCompression. + void FetchFirstDataBlock(const std::string& table_name, bool use_direct_io, + bool compressed, bool do_uncompress, + CompressionType expected_compression_type, + MemoryAllocator* heap_buf_allocator, + MemoryAllocator* compressed_buf_allocator, + BlockContents* block, MemcpyStats* memcpy_stats) { + if (use_direct_io && !IsDirectIOSupported()) { + printf("Skip this test since direct IO is not supported\n"); + return; + } + + Options options; + ImmutableCFOptions ioptions(options); + InternalKeyComparator comparator(options.comparator); + + FileOptions foptions; + foptions.use_direct_reads = use_direct_io; + + // Get block handle for the first data block. + std::unique_ptr table; + NewTableReader(ioptions, foptions, comparator, table_name, &table); + + std::unique_ptr index_reader; + ASSERT_OK(BinarySearchIndexReader::Create( + table.get(), nullptr /* prefetch_buffer */, false /* use_cache */, + false /* prefetch */, false /* pin */, nullptr /* lookup_context */, + &index_reader)); + + std::unique_ptr> iter( + index_reader->NewIterator( + ReadOptions(), false /* disable_prefix_seek */, nullptr /* iter */, + nullptr /* get_context */, nullptr /* lookup_context */)); + ASSERT_OK(iter->status()); + iter->SeekToFirst(); + BlockHandle first_block_handle = iter->value().handle; + + // Fetch first data block. + std::unique_ptr file; + NewFileReader(table_name, foptions, &file); + CompressionType compression_type; + FetchBlock(file.get(), first_block_handle, BlockType::kData, compressed, + do_uncompress, heap_buf_allocator, compressed_buf_allocator, + block, memcpy_stats, &compression_type); + ASSERT_EQ(compression_type, expected_compression_type); + } +}; + +// Fetch index block under both direct IO and non-direct IO. +// Expects: +// the index block contents are the same for both read modes. +TEST_F(BlockFetcherTest, FetchIndexBlock) { + if (!IsDirectIOSupported()) { + printf("Skip this test since direct IO is not supported\n"); + return; + } + + for (CompressionType compression : GetSupportedCompressions()) { + std::string table_name = + "FetchIndexBlock" + CompressionTypeToString(compression); + CreateTable(table_name, compression); + + CountedMemoryAllocator allocator; + MemcpyStats memcpy_stats; + BlockContents indexes[2]; + for (bool use_direct_io : {false, true}) { + FetchIndexBlock(table_name, use_direct_io, &allocator, &allocator, + &memcpy_stats, &indexes[use_direct_io]); + } + AssertSameBlock(indexes[0], indexes[1]); + } +} + +// Data blocks are not compressed, +// fetch data block under both direct IO and non-direct IO. +// Expects: +// 1. in non-direct IO mode, allocate a heap buffer and memcpy the block +// into the buffer; +// 2. in direct IO mode, allocate a heap buffer and memcpy from the +// direct IO buffer to the heap buffer. +TEST_F(BlockFetcherTest, FetchUncompressedDataBlock) { + MemcpyStats memcpy_stats; + memcpy_stats.num_heap_buf_memcpy = 1; + + BufAllocationStats buf_allocation_stats; + buf_allocation_stats.num_heap_buf_allocations = 1; + + TestStats expected_stats{memcpy_stats, buf_allocation_stats}; + + TestFetchDataBlock("FetchUncompressedDataBlock", false, false, expected_stats, + expected_stats); +} + +// Data blocks are compressed, +// fetch data block under both direct IO and non-direct IO, +// but do not uncompress. +// Expects: +// 1. in non-direct IO mode, allocate a compressed buffer and memcpy the block +// into the buffer; +// 2. in direct IO mode, allocate a compressed buffer and memcpy from the +// direct IO buffer to the compressed buffer. +TEST_F(BlockFetcherTest, FetchCompressedDataBlock) { + MemcpyStats memcpy_stats; + memcpy_stats.num_compressed_buf_memcpy = 1; + + BufAllocationStats buf_allocation_stats; + buf_allocation_stats.num_compressed_buf_allocations = 1; + + TestStats expected_stats{memcpy_stats, buf_allocation_stats}; + + TestFetchDataBlock("FetchCompressedDataBlock", true, false, expected_stats, + expected_stats); +} + +// Data blocks are compressed, +// fetch and uncompress data block under both direct IO and non-direct IO. +// Expects: +// 1. in non-direct IO mode, since the block is small, so it's first memcpyed +// to the stack buffer, then a heap buffer is allocated and the block is +// uncompressed into the heap. +// 2. in direct IO mode mode, allocate a heap buffer, then directly uncompress +// and memcpy from the direct IO buffer to the heap buffer. +TEST_F(BlockFetcherTest, FetchAndUncompressCompressedDataBlock) { + TestStats expected_non_direct_io_stats; + { + MemcpyStats memcpy_stats; + memcpy_stats.num_stack_buf_memcpy = 1; + memcpy_stats.num_heap_buf_memcpy = 1; + + BufAllocationStats buf_allocation_stats; + buf_allocation_stats.num_heap_buf_allocations = 1; + buf_allocation_stats.num_compressed_buf_allocations = 0; + + expected_non_direct_io_stats = {memcpy_stats, buf_allocation_stats}; + } + + TestStats expected_direct_io_stats; + { + MemcpyStats memcpy_stats; + memcpy_stats.num_heap_buf_memcpy = 1; + + BufAllocationStats buf_allocation_stats; + buf_allocation_stats.num_heap_buf_allocations = 1; + + expected_direct_io_stats = {memcpy_stats, buf_allocation_stats}; + } + + TestFetchDataBlock("FetchAndUncompressCompressedDataBlock", true, true, + expected_non_direct_io_stats, expected_direct_io_stats); +} + +} // namespace +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/table/format.cc b/table/format.cc index 5bfd88ebb..3bc964424 100644 --- a/table/format.cc +++ b/table/format.cc @@ -293,7 +293,7 @@ Status ReadFooterFromFile(RandomAccessFileReader* file, } std::string footer_buf; - std::unique_ptr internal_buf; + AlignedBuf internal_buf; Slice footer_input; size_t read_offset = (file_size > Footer::kMaxEncodedLength) diff --git a/test_util/testutil.cc b/test_util/testutil.cc index 86811f71f..65c8a7c93 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -508,5 +508,18 @@ size_t GetLinesCount(const std::string& fname, const std::string& pattern) { return count; } +void ResetTmpDirForDirectIO() { +#ifdef OS_LINUX + unsetenv("TEST_TMPDIR"); + char* tmpdir = getenv("DISK_TEMP_DIR"); + if (tmpdir == nullptr) { + tmpdir = getenv("HOME"); + } + if (tmpdir != nullptr) { + setenv("TEST_TMPDIR", tmpdir, 1); + } +#endif +} + } // namespace test } // namespace ROCKSDB_NAMESPACE diff --git a/test_util/testutil.h b/test_util/testutil.h index 965d65664..ef1f2913e 100644 --- a/test_util/testutil.h +++ b/test_util/testutil.h @@ -800,5 +800,10 @@ bool IsDirectIOSupported(Env* env, const std::string& dir); // Return the number of lines where a given pattern was found in a file. size_t GetLinesCount(const std::string& fname, const std::string& pattern); +// TEST_TMPDIR may be set to /dev/shm in Makefile, +// but /dev/shm does not support direct IO. +// Tries to set TEST_TMPDIR to a directory supporting direct IO. +void ResetTmpDirForDirectIO(); + } // namespace test } // namespace ROCKSDB_NAMESPACE