diff --git a/CMakeLists.txt b/CMakeLists.txt index b2f8c0327..85e468748 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1069,6 +1069,7 @@ if(WITH_TESTS) options/options_settable_test.cc options/options_test.cc table/block_based/block_based_filter_block_test.cc + table/block_based/block_based_table_reader_test.cc table/block_based/block_test.cc table/block_based/data_block_hash_index_test.cc table/block_based/full_filter_block_test.cc diff --git a/Makefile b/Makefile index b69b5286f..3e33b40de 100644 --- a/Makefile +++ b/Makefile @@ -539,6 +539,7 @@ TESTS = \ random_access_file_reader_test \ file_reader_writer_test \ block_based_filter_block_test \ + block_based_table_reader_test \ full_filter_block_test \ partitioned_filter_block_test \ hash_table_test \ @@ -1568,6 +1569,9 @@ file_reader_writer_test: util/file_reader_writer_test.o $(LIBOBJECTS) $(TESTHARN block_based_filter_block_test: table/block_based/block_based_filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +block_based_table_reader_test: table/block_based/block_based_table_reader_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + full_filter_block_test: table/block_based/full_filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 57f2b647b..d6b1e5687 100644 --- a/TARGETS +++ b/TARGETS @@ -515,6 +515,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "block_based_table_reader_test", + "table/block_based/block_based_table_reader_test.cc", + "serial", + [], + [], + ], [ "block_cache_trace_analyzer_test", "tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc", diff --git a/file/random_access_file_reader_test.cc b/file/random_access_file_reader_test.cc index 499021ec5..83791a9cd 100644 --- a/file/random_access_file_reader_test.cc +++ b/file/random_access_file_reader_test.cc @@ -15,7 +15,7 @@ namespace ROCKSDB_NAMESPACE { class RandomAccessFileReaderTest : public testing::Test { public: void SetUp() override { - test::ResetTmpDirForDirectIO(); + test::SetupSyncPointsToMockDirectIO(); env_ = Env::Default(); fs_ = FileSystem::Default(); test_dir_ = test::PerThreadDBPath("random_access_file_reader_test"); @@ -27,15 +27,6 @@ class RandomAccessFileReaderTest : public testing::Test { EXPECT_OK(test::DestroyDir(env_, test_dir_)); } - bool IsDirectIOSupported() { - Write(".direct", ""); - FileOptions opt; - opt.use_direct_reads = true; - std::unique_ptr f; - auto s = fs_->NewRandomAccessFile(Path(".direct"), opt, &f, nullptr); - return s.ok(); - } - void Write(const std::string& fname, const std::string& content) { std::unique_ptr f; ASSERT_OK(fs_->NewWritableFile(Path(fname), FileOptions(), &f, nullptr)); @@ -84,11 +75,6 @@ class RandomAccessFileReaderTest : public testing::Test { }; TEST_F(RandomAccessFileReaderTest, ReadDirectIO) { - if (!IsDirectIOSupported()) { - printf("Direct IO is not supported, skip this test\n"); - return; - } - std::string fname = "read-direct-io"; Random rand(0); std::string content; @@ -113,11 +99,6 @@ TEST_F(RandomAccessFileReaderTest, ReadDirectIO) { } TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) { - if (!IsDirectIOSupported()) { - printf("Direct IO is not supported, skip this test\n"); - return; - } - // Creates a file with 3 pages. std::string fname = "multi-read-direct-io"; Random rand(0); diff --git a/src.mk b/src.mk index 1c28f3e0e..cb0c5883e 100644 --- a/src.mk +++ b/src.mk @@ -415,6 +415,7 @@ MAIN_SOURCES = \ monitoring/stats_history_test.cc \ options/options_test.cc \ table/block_based/block_based_filter_block_test.cc \ + table/block_based/block_based_table_reader_test.cc \ table/block_based/block_test.cc \ table/block_based/data_block_hash_index_test.cc \ table/block_based/full_filter_block_test.cc \ diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index ad1b7e26a..aceb09ea5 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -1594,7 +1594,7 @@ void BlockBasedTable::RetrieveMultipleBlocks( size_t read_amp_bytes_per_bit = rep_->table_options.read_amp_bytes_per_bit; MemoryAllocator* memory_allocator = GetMemoryAllocator(rep_->table_options); - if (file->use_direct_io() || ioptions.allow_mmap_reads) { + if (ioptions.allow_mmap_reads) { size_t idx_in_batch = 0; for (auto mget_iter = batch->begin(); mget_iter != batch->end(); ++mget_iter, ++idx_in_batch) { @@ -1614,6 +1614,10 @@ void BlockBasedTable::RetrieveMultipleBlocks( return; } + // In direct IO mode, blocks share the direct io buffer. + // Otherwise, blocks share the scratch buffer. + const bool use_shared_buffer = file->use_direct_io() || scratch != nullptr; + autovector read_reqs; size_t buf_offset = 0; size_t idx_in_batch = 0; @@ -1634,7 +1638,11 @@ void BlockBasedTable::RetrieveMultipleBlocks( // If current block is adjacent to the previous one, at the same time, // compression is enabled and there is no compressed cache, we combine // the two block read as one. - if (scratch != nullptr && prev_end == handle.offset()) { + // We don't combine block reads here in direct IO mode, because when doing + // direct IO read, the block requests will be realigned and merged when + // necessary. + if (use_shared_buffer && !file->use_direct_io() && + prev_end == handle.offset()) { req_offset_for_block.emplace_back(prev_len); prev_len += block_size(handle); } else { @@ -1644,11 +1652,13 @@ void BlockBasedTable::RetrieveMultipleBlocks( FSReadRequest req; req.offset = prev_offset; req.len = prev_len; - if (scratch == nullptr) { - req.scratch = new char[req.len]; - } else { + if (file->use_direct_io()) { + req.scratch = nullptr; + } else if (use_shared_buffer) { req.scratch = scratch + buf_offset; buf_offset += req.len; + } else { + req.scratch = new char[req.len]; } read_reqs.emplace_back(req); } @@ -1665,14 +1675,17 @@ void BlockBasedTable::RetrieveMultipleBlocks( FSReadRequest req; req.offset = prev_offset; req.len = prev_len; - if (scratch == nullptr) { - req.scratch = new char[req.len]; - } else { + if (file->use_direct_io()) { + req.scratch = nullptr; + } else if (use_shared_buffer) { req.scratch = scratch + buf_offset; + } else { + req.scratch = new char[req.len]; } read_reqs.emplace_back(req); } + AlignedBuf direct_io_buf; { IOOptions opts; IOStatus s = PrepareIOFromReadOptions(options, file->env(), opts); @@ -1681,7 +1694,7 @@ void BlockBasedTable::RetrieveMultipleBlocks( req.status = s; } } else { - file->MultiRead(opts, &read_reqs[0], read_reqs.size(), nullptr); + file->MultiRead(opts, &read_reqs[0], read_reqs.size(), &direct_io_buf); } } @@ -1721,19 +1734,21 @@ void BlockBasedTable::RetrieveMultipleBlocks( " bytes, got " + ToString(req.result.size())); } - bool blocks_share_read_buffer = (req.result.size() != block_size(handle)); if (s.ok()) { - if (scratch == nullptr && !blocks_share_read_buffer) { + if (!use_shared_buffer) { // We allocated a buffer for this block. Give ownership of it to // BlockContents so it can free the memory assert(req.result.data() == req.scratch); - std::unique_ptr raw_block(req.scratch + req_offset); + assert(req.result.size() == block_size(handle)); + assert(req_offset == 0); + std::unique_ptr raw_block(req.scratch); raw_block_contents = BlockContents(std::move(raw_block), handle.size()); } else { - // We used the scratch buffer which are shared by the blocks. + // We used the scratch buffer or direct io buffer + // which are shared by the blocks. // raw_block_contents does not have the ownership. raw_block_contents = - BlockContents(Slice(req.scratch + req_offset, handle.size())); + BlockContents(Slice(req.result.data() + req_offset, handle.size())); } #ifndef NDEBUG @@ -1757,16 +1772,15 @@ void BlockBasedTable::RetrieveMultipleBlocks( } if (s.ok()) { - // It handles a rare case: compression is set and these is no compressed - // cache (enable combined read). In this case, the scratch != nullptr. - // At the same time, some blocks are actually not compressed, - // since its compression space saving is smaller than the threshold. In - // this case, if the block shares the scratch memory, we need to copy it - // to the heap such that it can be added to the regular block cache. + // When the blocks share the same underlying buffer (scratch or direct io + // buffer), if the block is compressed, the shared buffer will be + // uncompressed into heap during uncompressing; otherwise, we need to + // manually copy the block into heap before inserting the block to block + // cache. CompressionType compression_type = raw_block_contents.get_compression_type(); - if (scratch != nullptr && compression_type == kNoCompression) { - Slice raw = Slice(req.scratch + req_offset, block_size(handle)); + if (use_shared_buffer && compression_type == kNoCompression) { + Slice raw = Slice(req.result.data() + req_offset, block_size(handle)); raw_block_contents = BlockContents( CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw), handle.size()); @@ -1808,8 +1822,10 @@ void BlockBasedTable::RetrieveMultipleBlocks( handle.size(), &contents, footer.version(), rep_->ioptions, memory_allocator); } else { - // There are two cases here: 1) caller uses the scratch buffer; 2) we - // use the requst buffer. If scratch buffer is used, we ensure that + // There are two cases here: + // 1) caller uses the shared buffer (scratch or direct io buffer); + // 2) we use the requst buffer. + // If scratch buffer or direct io buffer is used, we ensure that // all raw blocks are copyed to the heap as single blocks. If scratch // buffer is not used, we also have no combined read, so the raw // block can be used directly. @@ -2512,6 +2528,7 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options, if (total_len) { char* scratch = nullptr; + // If using direct IO, then scratch is not used, so keep it nullptr. // If the blocks need to be uncompressed and we don't need the // compressed blocks, then we can use a contiguous block of // memory to read in all the blocks as it will be temporary @@ -2521,7 +2538,8 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options, // 2. If blocks are uncompressed, alloc heap bufs // 3. If blocks are compressed and no compressed block cache, use // stack buf - if (rep_->table_options.block_cache_compressed == nullptr && + if (!rep_->file->use_direct_io() && + rep_->table_options.block_cache_compressed == nullptr && rep_->blocks_maybe_compressed) { if (total_len <= kMultiGetReadStackBufSize) { scratch = stack_buf; diff --git a/table/block_based/block_based_table_reader_test.cc b/table/block_based/block_based_table_reader_test.cc new file mode 100644 index 000000000..85da3ce09 --- /dev/null +++ b/table/block_based/block_based_table_reader_test.cc @@ -0,0 +1,235 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "table/block_based/block_based_table_reader.h" + +#include "db/table_properties_collector.h" +#include "options/options_helper.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "table/block_based/block_based_table_builder.h" +#include "table/block_based/block_based_table_factory.h" +#include "table/format.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" + +namespace ROCKSDB_NAMESPACE { + +class BlockBasedTableReaderTest + : public testing::Test, + public testing::WithParamInterface> { + protected: + CompressionType compression_type_; + bool use_direct_reads_; + + void SetUp() override { + std::tie(compression_type_, use_direct_reads_) = GetParam(); + + test::SetupSyncPointsToMockDirectIO(); + test_dir_ = test::PerThreadDBPath("block_based_table_reader_test"); + env_ = Env::Default(); + fs_ = FileSystem::Default(); + ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr)); + } + + void TearDown() override { EXPECT_OK(test::DestroyDir(env_, test_dir_)); } + + // Creates a table with the specificied key value pairs (kv). + void CreateTable(const std::string& table_name, + const CompressionType& compression_type, + const std::map& kv) { + std::unique_ptr writer; + NewFileWriter(table_name, &writer); + + // Create table builder. + Options options; + ImmutableCFOptions ioptions(options); + InternalKeyComparator comparator(options.comparator); + ColumnFamilyOptions cf_options; + MutableCFOptions moptions(cf_options); + std::vector> factories; + std::unique_ptr table_builder(table_factory_.NewTableBuilder( + TableBuilderOptions(ioptions, moptions, comparator, &factories, + compression_type, 0 /* sample_for_compression */, + CompressionOptions(), false /* skip_filters */, + kDefaultColumnFamilyName, -1 /* level */), + 0 /* column_family_id */, writer.get())); + + // Build table. + for (auto it = kv.begin(); it != kv.end(); it++) { + std::string k = ToInternalKey(it->first); + std::string v = it->second; + table_builder->Add(k, v); + } + ASSERT_OK(table_builder->Finish()); + } + + void NewBlockBasedTableReader(const FileOptions& foptions, + const ImmutableCFOptions& ioptions, + const InternalKeyComparator& comparator, + const std::string& table_name, + std::unique_ptr* table) { + std::unique_ptr file; + NewFileReader(table_name, foptions, &file); + + uint64_t file_size = 0; + ASSERT_OK(env_->GetFileSize(Path(table_name), &file_size)); + + std::unique_ptr table_reader; + ASSERT_OK(BlockBasedTable::Open(ioptions, EnvOptions(), + table_factory_.table_options(), comparator, + std::move(file), file_size, &table_reader)); + + table->reset(reinterpret_cast(table_reader.release())); + } + + private: + std::string test_dir_; + Env* env_; + std::shared_ptr fs_; + BlockBasedTableFactory table_factory_; + + std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; } + + void WriteToFile(const std::string& content, const std::string& filename) { + std::unique_ptr f; + ASSERT_OK(fs_->NewWritableFile(Path(filename), FileOptions(), &f, nullptr)); + ASSERT_OK(f->Append(content, IOOptions(), nullptr)); + ASSERT_OK(f->Close(IOOptions(), nullptr)); + } + + void NewFileWriter(const std::string& filename, + std::unique_ptr* writer) { + std::string path = Path(filename); + EnvOptions env_options; + FileOptions foptions; + std::unique_ptr file; + ASSERT_OK(fs_->NewWritableFile(path, foptions, &file, nullptr)); + writer->reset(new WritableFileWriter(std::move(file), path, env_options)); + } + + void NewFileReader(const std::string& filename, const FileOptions& opt, + std::unique_ptr* reader) { + std::string path = Path(filename); + std::unique_ptr f; + ASSERT_OK(fs_->NewRandomAccessFile(path, opt, &f, nullptr)); + reader->reset(new RandomAccessFileReader(std::move(f), path, env_)); + } + + std::string ToInternalKey(const std::string& key) { + InternalKey internal_key(key, 0, ValueType::kTypeValue); + return internal_key.Encode().ToString(); + } +}; + +// Tests MultiGet in both direct IO and non-direct IO mode. +// The keys should be in cache after MultiGet. +TEST_P(BlockBasedTableReaderTest, MultiGet) { + // Prepare key-value pairs to occupy multiple blocks. + // Each value is 256B, every 16 pairs constitute 1 block. + // Adjacent blocks contain values with different compression complexity: + // human readable strings are easier to compress than random strings. + std::map kv; + { + Random rnd(101); + uint32_t key = 0; + for (int block = 0; block < 100; block++) { + for (int i = 0; i < 16; i++) { + char k[9] = {0}; + // Internal key is constructed directly from this key, + // and internal key size is required to be >= 8 bytes, + // so use %08u as the format string. + sprintf(k, "%08u", key); + std::string v; + if (block % 2) { + v = test::RandomHumanReadableString(&rnd, 256); + } else { + test::RandomString(&rnd, 256, &v); + } + kv[std::string(k)] = v; + key++; + } + } + } + + // Prepare keys, values, and statuses for MultiGet. + autovector keys; + autovector values; + autovector statuses; + { + const int step = + static_cast(kv.size()) / MultiGetContext::MAX_BATCH_SIZE; + auto it = kv.begin(); + for (int i = 0; i < MultiGetContext::MAX_BATCH_SIZE; i++) { + keys.emplace_back(it->first); + values.emplace_back(); + statuses.emplace_back(); + std::advance(it, step); + } + } + + std::string table_name = + "BlockBasedTableReaderTest" + CompressionTypeToString(compression_type_); + CreateTable(table_name, compression_type_, kv); + + std::unique_ptr table; + Options options; + ImmutableCFOptions ioptions(options); + FileOptions foptions; + foptions.use_direct_reads = use_direct_reads_; + InternalKeyComparator comparator(options.comparator); + NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table); + + // Ensure that keys are not in cache before MultiGet. + for (auto& key : keys) { + ASSERT_FALSE(table->TEST_KeyInCache(ReadOptions(), key)); + } + + // Prepare MultiGetContext. + autovector get_context; + autovector key_context; + autovector sorted_keys; + for (size_t i = 0; i < keys.size(); ++i) { + get_context.emplace_back( + BytewiseComparator(), nullptr, nullptr, nullptr, GetContext::kNotFound, + keys[i], &values[i], nullptr, nullptr, nullptr, true /* do_merge */, + nullptr, nullptr, nullptr, nullptr, nullptr, nullptr); + key_context.emplace_back(nullptr, keys[i], &values[i], nullptr, + &statuses.back()); + key_context.back().get_context = &get_context.back(); + } + for (auto& key_ctx : key_context) { + sorted_keys.emplace_back(&key_ctx); + } + MultiGetContext ctx(&sorted_keys, 0, sorted_keys.size(), 0, ReadOptions()); + + // Execute MultiGet. + MultiGetContext::Range range = ctx.GetMultiGetRange(); + table->MultiGet(ReadOptions(), &range, nullptr); + + for (const Status& status : statuses) { + ASSERT_OK(status); + } + // Check that keys are in cache after MultiGet. + for (size_t i = 0; i < keys.size(); i++) { + ASSERT_TRUE(table->TEST_KeyInCache(ReadOptions(), keys[i])); + ASSERT_EQ(values[i].ToString(), kv[keys[i].ToString()]); + } +} + +// Param 1: compression type +// Param 2: whether to use direct reads +INSTANTIATE_TEST_CASE_P( + MultiGet, BlockBasedTableReaderTest, + ::testing::Combine(::testing::ValuesIn(GetSupportedCompressions()), + ::testing::Bool())); + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/table/block_fetcher_test.cc b/table/block_fetcher_test.cc index 64655639f..632b3e52f 100644 --- a/table/block_fetcher_test.cc +++ b/table/block_fetcher_test.cc @@ -60,18 +60,15 @@ struct TestStats { class BlockFetcherTest : public testing::Test { protected: void SetUp() override { - test::ResetTmpDirForDirectIO(); + test::SetupSyncPointsToMockDirectIO(); test_dir_ = test::PerThreadDBPath("block_fetcher_test"); env_ = Env::Default(); fs_ = FileSystem::Default(); ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr)); - is_direct_io_supported_ = DetectDirectIOSupport(); } void TearDown() override { EXPECT_OK(test::DestroyDir(env_, test_dir_)); } - bool IsDirectIOSupported() const { return is_direct_io_supported_; } - void AssertSameBlock(const BlockContents& block1, const BlockContents& block2) { ASSERT_EQ(block1.data.ToString(), block2.data.ToString()); @@ -141,11 +138,6 @@ class BlockFetcherTest : public testing::Test { bool do_uncompress, const TestStats& expected_non_direct_io_stats, const TestStats& expected_direct_io_stats) { - if (!IsDirectIOSupported()) { - printf("Skip this test since direct IO is not supported\n"); - return; - } - for (CompressionType compression_type : GetSupportedCompressions()) { bool do_compress = compression_type != kNoCompression; if (compressed != do_compress) continue; @@ -212,7 +204,6 @@ class BlockFetcherTest : public testing::Test { Env* env_; std::shared_ptr fs_; BlockBasedTableFactory table_factory_; - bool is_direct_io_supported_; std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; } @@ -223,15 +214,6 @@ class BlockFetcherTest : public testing::Test { ASSERT_OK(f->Close(IOOptions(), nullptr)); } - bool DetectDirectIOSupport() { - WriteToFile("", ".direct"); - FileOptions opt; - opt.use_direct_reads = true; - std::unique_ptr f; - auto s = fs_->NewRandomAccessFile(Path(".direct"), opt, &f, nullptr); - return s.ok(); - } - void NewFileWriter(const std::string& filename, std::unique_ptr* writer) { std::string path = Path(filename); @@ -321,11 +303,6 @@ class BlockFetcherTest : public testing::Test { MemoryAllocator* heap_buf_allocator, MemoryAllocator* compressed_buf_allocator, BlockContents* block, MemcpyStats* memcpy_stats) { - if (use_direct_io && !IsDirectIOSupported()) { - printf("Skip this test since direct IO is not supported\n"); - return; - } - Options options; ImmutableCFOptions ioptions(options); InternalKeyComparator comparator(options.comparator); @@ -366,11 +343,6 @@ class BlockFetcherTest : public testing::Test { // Expects: // the index block contents are the same for both read modes. TEST_F(BlockFetcherTest, FetchIndexBlock) { - if (!IsDirectIOSupported()) { - printf("Skip this test since direct IO is not supported\n"); - return; - } - for (CompressionType compression : GetSupportedCompressions()) { std::string table_name = "FetchIndexBlock" + CompressionTypeToString(compression); diff --git a/test_util/testutil.cc b/test_util/testutil.cc index 4c6ef0999..16d02d6a6 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -510,19 +510,6 @@ size_t GetLinesCount(const std::string& fname, const std::string& pattern) { return count; } -void ResetTmpDirForDirectIO() { -#ifdef OS_LINUX - unsetenv("TEST_TMPDIR"); - char* tmpdir = getenv("DISK_TEMP_DIR"); - if (tmpdir == nullptr) { - tmpdir = getenv("HOME"); - } - if (tmpdir != nullptr) { - setenv("TEST_TMPDIR", tmpdir, 1); - } -#endif -} - void SetupSyncPointsToMockDirectIO() { #if !defined(NDEBUG) && !defined(OS_MACOSX) && !defined(OS_WIN) && \ !defined(OS_SOLARIS) && !defined(OS_AIX) && !defined(OS_OPENBSD)