Skip unnecessary allocation for mmap reads under 5000 bytes (#7043)

Summary:
With mmap enabled on an uncompressed file, we were previously always doing a heap allocation to obtain the scratch buffer for `RandomAccessFileReader::Read()`. However, that allocation was unnecessary as the underlying file reader returned a pointer into its mapped memory, not the provided scratch buffer. This PR makes passes the `BlockFetcher`'s inline buffer as the scratch buffer if the data block is small enough (less than `kDefaultStackBufferSize` bytes, currently 5000). Ideally we would not pass a scratch buffer at all for an mmap read; however, the `RandomAccessFile::Read()` API guarantees such a buffer is provided, and non-standard implementations may be relying on it even when `Options::allow_mmap_reads == true`. In that case, this PR still works but introduces an extra copy from the inline buffer to a heap buffer.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/7043

Reviewed By: cheng-chang

Differential Revision: D22320606

Pulled By: ajkr

fbshipit-source-id: ad964dd23df34e07d979c6032c2dfe5454c98b52
main
Andrew Kryczka 5 years ago committed by Facebook GitHub Bot
parent e367bc7f4b
commit 8458532d58
  1. 26
      table/block_fetcher.cc
  2. 285
      table/block_fetcher_test.cc

@ -99,10 +99,28 @@ inline bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() {
inline void BlockFetcher::PrepareBufferForBlockFromFile() {
// cache miss read from device
if (do_uncompress_ &&
if ((do_uncompress_ || ioptions_.allow_mmap_reads) &&
block_size_with_trailer_ < kDefaultStackBufferSize) {
// If we've got a small enough hunk of data, read it in to the
// trivially allocated stack buffer instead of needing a full malloc()
//
// `GetBlockContents()` cannot return this data as its lifetime is tied to
// this `BlockFetcher`'s lifetime. That is fine because this is only used
// in cases where we do not expect the `GetBlockContents()` result to be the
// same buffer we are assigning here. If we guess incorrectly, there will be
// a heap allocation and memcpy in `GetBlockContents()` to obtain the final
// result. Considering we are eliding a heap allocation here by using the
// stack buffer, the cost of guessing incorrectly here is one extra memcpy.
//
// When `do_uncompress_` is true, we expect the uncompression step will
// allocate heap memory for the final result. However this expectation will
// be wrong if the block turns out to already be uncompressed, which we
// won't know for sure until after reading it.
//
// When `ioptions_.allow_mmap_reads` is true, we do not expect the file
// reader to use the scratch buffer at all, but instead return a pointer
// into the mapped memory. This expectation will be wrong when using a
// file reader that does not implement mmap reads properly.
used_buf_ = &stack_buf_[0];
} else if (maybe_compressed_ && !do_uncompress_) {
compressed_buf_ = AllocateBlock(block_size_with_trailer_,
@ -226,11 +244,11 @@ Status BlockFetcher::ReadBlockContents() {
&slice_, used_buf_, nullptr, for_compaction_);
PERF_COUNTER_ADD(block_read_count, 1);
#ifndef NDEBUG
if (used_buf_ == &stack_buf_[0]) {
if (slice_.data() == &stack_buf_[0]) {
num_stack_buf_memcpy_++;
} else if (used_buf_ == heap_buf_.get()) {
} else if (slice_.data() == heap_buf_.get()) {
num_heap_buf_memcpy_++;
} else if (used_buf_ == compressed_buf_.get()) {
} else if (slice_.data() == compressed_buf_.get()) {
num_compressed_buf_memcpy_++;
}
#endif

@ -42,14 +42,14 @@ class CountedMemoryAllocator : public MemoryAllocator {
};
struct MemcpyStats {
int num_stack_buf_memcpy = 0;
int num_heap_buf_memcpy = 0;
int num_compressed_buf_memcpy = 0;
int num_stack_buf_memcpy;
int num_heap_buf_memcpy;
int num_compressed_buf_memcpy;
};
struct BufAllocationStats {
int num_heap_buf_allocations = 0;
int num_compressed_buf_allocations = 0;
int num_heap_buf_allocations;
int num_compressed_buf_allocations;
};
struct TestStats {
@ -58,6 +58,17 @@ struct TestStats {
};
class BlockFetcherTest : public testing::Test {
public:
enum class Mode {
kBufferedRead = 0,
kBufferedMmap,
kDirectRead,
kNumModes,
};
// use NumModes as array size to avoid "size of array '...' has non-integral
// type" errors.
const static int NumModes = static_cast<int>(Mode::kNumModes);
protected:
void SetUp() override {
test::SetupSyncPointsToMockDirectIO();
@ -69,9 +80,8 @@ class BlockFetcherTest : public testing::Test {
void TearDown() override { EXPECT_OK(test::DestroyDir(env_, test_dir_)); }
void AssertSameBlock(const BlockContents& block1,
const BlockContents& block2) {
ASSERT_EQ(block1.data.ToString(), block2.data.ToString());
void AssertSameBlock(const std::string& block1, const std::string& block2) {
ASSERT_EQ(block1, block2);
}
// Creates a table with kv pairs (i, i) where i ranges from 0 to 9, inclusive.
@ -81,10 +91,9 @@ class BlockFetcherTest : public testing::Test {
NewFileWriter(table_name, &writer);
// Create table builder.
Options options;
ImmutableCFOptions ioptions(options);
InternalKeyComparator comparator(options.comparator);
ColumnFamilyOptions cf_options;
ImmutableCFOptions ioptions(options_);
InternalKeyComparator comparator(options_.comparator);
ColumnFamilyOptions cf_options(options_);
MutableCFOptions moptions(cf_options);
std::vector<std::unique_ptr<IntTblPropCollectorFactory>> factories;
std::unique_ptr<TableBuilder> table_builder(table_factory_.NewTableBuilder(
@ -103,12 +112,12 @@ class BlockFetcherTest : public testing::Test {
ASSERT_OK(table_builder->Finish());
}
void FetchIndexBlock(const std::string& table_name, bool use_direct_io,
void FetchIndexBlock(const std::string& table_name,
CountedMemoryAllocator* heap_buf_allocator,
CountedMemoryAllocator* compressed_buf_allocator,
MemcpyStats* memcpy_stats, BlockContents* index_block) {
FileOptions fopt;
fopt.use_direct_reads = use_direct_io;
MemcpyStats* memcpy_stats, BlockContents* index_block,
std::string* result) {
FileOptions fopt(options_);
std::unique_ptr<RandomAccessFileReader> file;
NewFileReader(table_name, fopt, &file);
@ -123,6 +132,7 @@ class BlockFetcherTest : public testing::Test {
heap_buf_allocator, compressed_buf_allocator, index_block,
memcpy_stats, &compression_type);
ASSERT_EQ(compression_type, CompressionType::kNoCompression);
result->assign(index_block->data.ToString());
}
// Fetches the first data block in both direct IO and non-direct IO mode.
@ -134,10 +144,9 @@ class BlockFetcherTest : public testing::Test {
// Expects:
// Block contents are the same.
// Bufferr allocation and memory copy statistics are expected.
void TestFetchDataBlock(const std::string& table_name_prefix, bool compressed,
bool do_uncompress,
const TestStats& expected_non_direct_io_stats,
const TestStats& expected_direct_io_stats) {
void TestFetchDataBlock(
const std::string& table_name_prefix, bool compressed, bool do_uncompress,
std::array<TestStats, NumModes> expected_stats_by_mode) {
for (CompressionType compression_type : GetSupportedCompressions()) {
bool do_compress = compression_type != kNoCompression;
if (compressed != do_compress) continue;
@ -150,60 +159,80 @@ class BlockFetcherTest : public testing::Test {
CompressionType expected_compression_type_after_fetch =
(compressed && !do_uncompress) ? compression_type : kNoCompression;
BlockContents blocks[2];
MemcpyStats memcpy_stats[2];
CountedMemoryAllocator heap_buf_allocators[2];
CountedMemoryAllocator compressed_buf_allocators[2];
for (bool use_direct_io : {false, true}) {
FetchFirstDataBlock(
table_name, use_direct_io, compressed, do_uncompress,
BlockContents blocks[NumModes];
std::string block_datas[NumModes];
MemcpyStats memcpy_stats[NumModes];
CountedMemoryAllocator heap_buf_allocators[NumModes];
CountedMemoryAllocator compressed_buf_allocators[NumModes];
for (int i = 0; i < NumModes; ++i) {
SetMode(static_cast<Mode>(i));
FetchFirstDataBlock(table_name, compressed, do_uncompress,
expected_compression_type_after_fetch,
&heap_buf_allocators[use_direct_io],
&compressed_buf_allocators[use_direct_io], &blocks[use_direct_io],
&memcpy_stats[use_direct_io]);
&heap_buf_allocators[i],
&compressed_buf_allocators[i], &blocks[i],
&block_datas[i], &memcpy_stats[i]);
}
AssertSameBlock(blocks[0], blocks[1]);
for (int i = 0; i < NumModes - 1; ++i) {
AssertSameBlock(block_datas[i], block_datas[i + 1]);
}
// Check memcpy and buffer allocation statistics.
for (bool use_direct_io : {false, true}) {
const TestStats& expected_stats = use_direct_io
? expected_direct_io_stats
: expected_non_direct_io_stats;
for (int i = 0; i < NumModes; ++i) {
const TestStats& expected_stats = expected_stats_by_mode[i];
ASSERT_EQ(memcpy_stats[use_direct_io].num_stack_buf_memcpy,
ASSERT_EQ(memcpy_stats[i].num_stack_buf_memcpy,
expected_stats.memcpy_stats.num_stack_buf_memcpy);
ASSERT_EQ(memcpy_stats[use_direct_io].num_heap_buf_memcpy,
ASSERT_EQ(memcpy_stats[i].num_heap_buf_memcpy,
expected_stats.memcpy_stats.num_heap_buf_memcpy);
ASSERT_EQ(memcpy_stats[use_direct_io].num_compressed_buf_memcpy,
ASSERT_EQ(memcpy_stats[i].num_compressed_buf_memcpy,
expected_stats.memcpy_stats.num_compressed_buf_memcpy);
ASSERT_EQ(heap_buf_allocators[use_direct_io].GetNumAllocations(),
ASSERT_EQ(heap_buf_allocators[i].GetNumAllocations(),
expected_stats.buf_allocation_stats.num_heap_buf_allocations);
ASSERT_EQ(
compressed_buf_allocators[use_direct_io].GetNumAllocations(),
compressed_buf_allocators[i].GetNumAllocations(),
expected_stats.buf_allocation_stats.num_compressed_buf_allocations);
// The allocated buffers are not deallocated until
// the block content is deleted.
ASSERT_EQ(heap_buf_allocators[use_direct_io].GetNumDeallocations(), 0);
ASSERT_EQ(
compressed_buf_allocators[use_direct_io].GetNumDeallocations(), 0);
blocks[use_direct_io].allocation.reset();
ASSERT_EQ(heap_buf_allocators[use_direct_io].GetNumDeallocations(),
ASSERT_EQ(heap_buf_allocators[i].GetNumDeallocations(), 0);
ASSERT_EQ(compressed_buf_allocators[i].GetNumDeallocations(), 0);
blocks[i].allocation.reset();
ASSERT_EQ(heap_buf_allocators[i].GetNumDeallocations(),
expected_stats.buf_allocation_stats.num_heap_buf_allocations);
ASSERT_EQ(
compressed_buf_allocators[use_direct_io].GetNumDeallocations(),
compressed_buf_allocators[i].GetNumDeallocations(),
expected_stats.buf_allocation_stats.num_compressed_buf_allocations);
}
}
}
void SetMode(Mode mode) {
switch (mode) {
case Mode::kBufferedRead:
options_.use_direct_reads = false;
options_.allow_mmap_reads = false;
break;
case Mode::kBufferedMmap:
options_.use_direct_reads = false;
options_.allow_mmap_reads = true;
break;
case Mode::kDirectRead:
options_.use_direct_reads = true;
options_.allow_mmap_reads = false;
break;
case Mode::kNumModes:
assert(false);
}
}
private:
std::string test_dir_;
Env* env_;
std::shared_ptr<FileSystem> fs_;
BlockBasedTableFactory table_factory_;
Options options_;
std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; }
@ -274,8 +303,7 @@ class BlockFetcherTest : public testing::Test {
MemoryAllocator* compressed_buf_allocator,
BlockContents* contents, MemcpyStats* stats,
CompressionType* compresstion_type) {
Options options;
ImmutableCFOptions ioptions(options);
ImmutableCFOptions ioptions(options_);
ReadOptions roptions;
PersistentCacheOptions persistent_cache_options;
Footer footer;
@ -299,18 +327,16 @@ class BlockFetcherTest : public testing::Test {
// NOTE: expected_compression_type is the expected compression
// type of the fetched block content, if the block is uncompressed,
// then the expected compression type is kNoCompression.
void FetchFirstDataBlock(const std::string& table_name, bool use_direct_io,
bool compressed, bool do_uncompress,
void FetchFirstDataBlock(const std::string& table_name, bool compressed,
bool do_uncompress,
CompressionType expected_compression_type,
MemoryAllocator* heap_buf_allocator,
MemoryAllocator* compressed_buf_allocator,
BlockContents* block, MemcpyStats* memcpy_stats) {
Options options;
ImmutableCFOptions ioptions(options);
InternalKeyComparator comparator(options.comparator);
FileOptions foptions;
foptions.use_direct_reads = use_direct_io;
BlockContents* block, std::string* result,
MemcpyStats* memcpy_stats) {
ImmutableCFOptions ioptions(options_);
InternalKeyComparator comparator(options_.comparator);
FileOptions foptions(options_);
// Get block handle for the first data block.
std::unique_ptr<BlockBasedTable> table;
@ -339,6 +365,7 @@ class BlockFetcherTest : public testing::Test {
do_uncompress, heap_buf_allocator, compressed_buf_allocator,
block, memcpy_stats, &compression_type);
ASSERT_EQ(compression_type, expected_compression_type);
result->assign(block->data.ToString());
}
};
@ -356,33 +383,53 @@ TEST_F(BlockFetcherTest, FetchIndexBlock) {
CountedMemoryAllocator allocator;
MemcpyStats memcpy_stats;
BlockContents indexes[2];
for (bool use_direct_io : {false, true}) {
FetchIndexBlock(table_name, use_direct_io, &allocator, &allocator,
&memcpy_stats, &indexes[use_direct_io]);
BlockContents indexes[NumModes];
std::string index_datas[NumModes];
for (int i = 0; i < NumModes; ++i) {
SetMode(static_cast<Mode>(i));
FetchIndexBlock(table_name, &allocator, &allocator, &memcpy_stats,
&indexes[i], &index_datas[i]);
}
for (int i = 0; i < NumModes - 1; ++i) {
AssertSameBlock(index_datas[i], index_datas[i + 1]);
}
AssertSameBlock(indexes[0], indexes[1]);
}
}
// Data blocks are not compressed,
// fetch data block under both direct IO and non-direct IO.
// fetch data block under direct IO, mmap IO,and non-direct IO.
// Expects:
// 1. in non-direct IO mode, allocate a heap buffer and memcpy the block
// into the buffer;
// 2. in direct IO mode, allocate a heap buffer and memcpy from the
// direct IO buffer to the heap buffer.
TEST_F(BlockFetcherTest, FetchUncompressedDataBlock) {
MemcpyStats memcpy_stats;
memcpy_stats.num_heap_buf_memcpy = 1;
BufAllocationStats buf_allocation_stats;
buf_allocation_stats.num_heap_buf_allocations = 1;
TestStats expected_stats{memcpy_stats, buf_allocation_stats};
TestFetchDataBlock("FetchUncompressedDataBlock", false, false, expected_stats,
expected_stats);
TestStats expected_non_mmap_stats = {
{
0 /* num_stack_buf_memcpy */,
1 /* num_heap_buf_memcpy */,
0 /* num_compressed_buf_memcpy */,
},
{
1 /* num_heap_buf_allocations */,
0 /* num_compressed_buf_allocations */,
}};
TestStats expected_mmap_stats = {{
0 /* num_stack_buf_memcpy */,
0 /* num_heap_buf_memcpy */,
0 /* num_compressed_buf_memcpy */,
},
{
0 /* num_heap_buf_allocations */,
0 /* num_compressed_buf_allocations */,
}};
std::array<TestStats, NumModes> expected_stats_by_mode{{
expected_non_mmap_stats /* kBufferedRead */,
expected_mmap_stats /* kBufferedMmap */,
expected_non_mmap_stats /* kDirectRead */,
}};
TestFetchDataBlock("FetchUncompressedDataBlock", false, false,
expected_stats_by_mode);
}
// Data blocks are compressed,
@ -394,16 +441,32 @@ TEST_F(BlockFetcherTest, FetchUncompressedDataBlock) {
// 2. in direct IO mode, allocate a compressed buffer and memcpy from the
// direct IO buffer to the compressed buffer.
TEST_F(BlockFetcherTest, FetchCompressedDataBlock) {
MemcpyStats memcpy_stats;
memcpy_stats.num_compressed_buf_memcpy = 1;
BufAllocationStats buf_allocation_stats;
buf_allocation_stats.num_compressed_buf_allocations = 1;
TestStats expected_stats{memcpy_stats, buf_allocation_stats};
TestFetchDataBlock("FetchCompressedDataBlock", true, false, expected_stats,
expected_stats);
TestStats expected_non_mmap_stats = {
{
0 /* num_stack_buf_memcpy */,
0 /* num_heap_buf_memcpy */,
1 /* num_compressed_buf_memcpy */,
},
{
0 /* num_heap_buf_allocations */,
1 /* num_compressed_buf_allocations */,
}};
TestStats expected_mmap_stats = {{
0 /* num_stack_buf_memcpy */,
0 /* num_heap_buf_memcpy */,
0 /* num_compressed_buf_memcpy */,
},
{
0 /* num_heap_buf_allocations */,
0 /* num_compressed_buf_allocations */,
}};
std::array<TestStats, NumModes> expected_stats_by_mode{{
expected_non_mmap_stats /* kBufferedRead */,
expected_mmap_stats /* kBufferedMmap */,
expected_non_mmap_stats /* kDirectRead */,
}};
TestFetchDataBlock("FetchCompressedDataBlock", true, false,
expected_stats_by_mode);
}
// Data blocks are compressed,
@ -415,32 +478,42 @@ TEST_F(BlockFetcherTest, FetchCompressedDataBlock) {
// 2. in direct IO mode mode, allocate a heap buffer, then directly uncompress
// and memcpy from the direct IO buffer to the heap buffer.
TEST_F(BlockFetcherTest, FetchAndUncompressCompressedDataBlock) {
TestStats expected_non_direct_io_stats;
TestStats expected_buffered_read_stats = {
{
MemcpyStats memcpy_stats;
memcpy_stats.num_stack_buf_memcpy = 1;
memcpy_stats.num_heap_buf_memcpy = 1;
BufAllocationStats buf_allocation_stats;
buf_allocation_stats.num_heap_buf_allocations = 1;
buf_allocation_stats.num_compressed_buf_allocations = 0;
expected_non_direct_io_stats = {memcpy_stats, buf_allocation_stats};
}
TestStats expected_direct_io_stats;
1 /* num_stack_buf_memcpy */,
1 /* num_heap_buf_memcpy */,
0 /* num_compressed_buf_memcpy */,
},
{
MemcpyStats memcpy_stats;
memcpy_stats.num_heap_buf_memcpy = 1;
BufAllocationStats buf_allocation_stats;
buf_allocation_stats.num_heap_buf_allocations = 1;
expected_direct_io_stats = {memcpy_stats, buf_allocation_stats};
}
1 /* num_heap_buf_allocations */,
0 /* num_compressed_buf_allocations */,
}};
TestStats expected_mmap_stats = {{
0 /* num_stack_buf_memcpy */,
1 /* num_heap_buf_memcpy */,
0 /* num_compressed_buf_memcpy */,
},
{
1 /* num_heap_buf_allocations */,
0 /* num_compressed_buf_allocations */,
}};
TestStats expected_direct_read_stats = {
{
0 /* num_stack_buf_memcpy */,
1 /* num_heap_buf_memcpy */,
0 /* num_compressed_buf_memcpy */,
},
{
1 /* num_heap_buf_allocations */,
0 /* num_compressed_buf_allocations */,
}};
std::array<TestStats, NumModes> expected_stats_by_mode{{
expected_buffered_read_stats,
expected_mmap_stats,
expected_direct_read_stats,
}};
TestFetchDataBlock("FetchAndUncompressCompressedDataBlock", true, true,
expected_non_direct_io_stats, expected_direct_io_stats);
expected_stats_by_mode);
}
#endif // ROCKSDB_LITE

Loading…
Cancel
Save