From f9f4d40f935971d427f87141261eea52539b6bb8 Mon Sep 17 00:00:00 2001 From: Anand Ananthabhotla Date: Mon, 26 Mar 2018 20:14:24 -0700 Subject: [PATCH] Align SST file data blocks to avoid spanning multiple pages Summary: Provide a block_align option in BlockBasedTableOptions to allow alignment of SST file data blocks. This will avoid higher IOPS/throughput load due to < 4KB data blocks spanning 2 4KB pages. When this option is set to true, the block alignment is set to lower of block size and 4KB. Closes https://github.com/facebook/rocksdb/pull/3502 Differential Revision: D7400897 Pulled By: anand1976 fbshipit-source-id: 04cc3bd144e88e3431a4f97604e63ad7a0f06d44 --- HISTORY.md | 1 + include/rocksdb/table.h | 3 + options/options_settable_test.cc | 3 +- table/block_based_table_builder.cc | 19 +++++- table/block_based_table_builder.h | 3 +- table/block_based_table_factory.cc | 12 ++++ table/block_based_table_factory.h | 3 + table/flush_block_policy.cc | 19 ++++-- table/table_test.cc | 104 +++++++++++++++++++++++++++++ tools/db_bench_tool.cc | 4 ++ util/aligned_buffer.h | 6 ++ util/file_reader_writer.cc | 25 +++++++ util/file_reader_writer.h | 2 + 13 files changed, 195 insertions(+), 9 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 44e408485..b6e479d16 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,6 +1,7 @@ # Rocksdb Change Log ## Unreleased ### Public API Change +* Add a BlockBasedTableOption to align uncompressed data blocks on the smaller of block size or page size boundary, to reduce flash reads by avoiding reads spanning 4K pages. ### New Features diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index 616505753..a86c763c0 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -222,6 +222,9 @@ struct BlockBasedTableOptions { // false will avoid the overhead of decompression if index blocks are evicted // and read back bool enable_index_compression = true; + + // Align data blocks on lesser of page size and block size + bool block_align = false; }; // Table Properties that are specific to block-based table properties. diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index d64473c3a..11f708fe7 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -151,7 +151,8 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) { "format_version=1;" "hash_index_allow_collision=false;" "verify_compression=true;read_amp_bytes_per_bit=0;" - "enable_index_compression=false", + "enable_index_compression=false;" + "block_align=true", new_bbto)); ASSERT_EQ(unset_bytes_base, diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 9908257b6..607d2a534 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -249,6 +249,7 @@ struct BlockBasedTableBuilder::Rep { WritableFileWriter* file; uint64_t offset = 0; Status status; + size_t alignment; BlockBuilder data_block; BlockBuilder range_del_block; @@ -294,6 +295,9 @@ struct BlockBasedTableBuilder::Rep { table_options(table_opt), internal_comparator(icomparator), file(f), + alignment(table_options.block_align + ? std::min(table_options.block_size, kDefaultPageSize) + : 0), data_block(table_options.block_restart_interval, table_options.use_delta_encoding), range_del_block(1), // TODO(andrewkr): restart_interval unnecessary @@ -537,13 +541,14 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED); } - WriteRawBlock(block_contents, type, handle); + WriteRawBlock(block_contents, type, handle, is_data_block); r->compressed_output.clear(); } void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, CompressionType type, - BlockHandle* handle) { + BlockHandle* handle, + bool is_data_block) { Rep* r = rep_; StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS); handle->set_offset(r->offset); @@ -581,6 +586,16 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, } if (r->status.ok()) { r->offset += block_contents.size() + kBlockTrailerSize; + if (r->table_options.block_align && is_data_block) { + size_t pad_bytes = + (r->alignment - ((block_contents.size() + kBlockTrailerSize) & + (r->alignment - 1))) & + (r->alignment - 1); + r->status = r->file->Pad(pad_bytes); + if (r->status.ok()) { + r->offset += pad_bytes; + } + } } } } diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index 36dfce1f0..392dedc1f 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -96,7 +96,8 @@ class BlockBasedTableBuilder : public TableBuilder { void WriteBlock(const Slice& block_contents, BlockHandle* handle, bool is_data_block); // Directly write data to the file. - void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle); + void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle, + bool is_data_block = false); Status InsertBlockInCache(const Slice& block_contents, const CompressionType type, const BlockHandle* handle); diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 7a74bcd5b..d468eaa24 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -114,6 +114,15 @@ Status BlockBasedTableFactory::SanitizeOptions( "Unsupported BlockBasedTable format_version. Please check " "include/rocksdb/table.h for more info"); } + if (table_options_.block_align && (cf_opts.compression != kNoCompression)) { + return Status::InvalidArgument("Enable block_align, but compression " + "enabled"); + } + if (table_options_.block_align && + (table_options_.block_size & (table_options_.block_size - 1))) { + return Status::InvalidArgument( + "Block alignment requested but block size is not a power of 2"); + } return Status::OK(); } @@ -225,6 +234,9 @@ std::string BlockBasedTableFactory::GetPrintableTableOptions() const { snprintf(buffer, kBufferSize, " enable_index_compression: %d\n", table_options_.enable_index_compression); ret.append(buffer); + snprintf(buffer, kBufferSize, " block_align: %d\n", + table_options_.block_align); + ret.append(buffer); return ret; } diff --git a/table/block_based_table_factory.h b/table/block_based_table_factory.h index a5eba7eff..b9d3a97d6 100644 --- a/table/block_based_table_factory.h +++ b/table/block_based_table_factory.h @@ -155,6 +155,9 @@ static std::unordered_map OptionType::kSizeT, OptionVerificationType::kNormal, false, 0}}, {"enable_index_compression", {offsetof(struct BlockBasedTableOptions, enable_index_compression), + OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, + {"block_align", + {offsetof(struct BlockBasedTableOptions, block_align), OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}}; #endif // !ROCKSDB_LITE } // namespace rocksdb diff --git a/table/flush_block_policy.cc b/table/flush_block_policy.cc index 9a8dea4cb..d2a4b9627 100644 --- a/table/flush_block_policy.cc +++ b/table/flush_block_policy.cc @@ -3,10 +3,11 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). -#include "rocksdb/options.h" #include "rocksdb/flush_block_policy.h" +#include "rocksdb/options.h" #include "rocksdb/slice.h" #include "table/block_builder.h" +#include "table/format.h" #include @@ -21,10 +22,12 @@ class FlushBlockBySizePolicy : public FlushBlockPolicy { // reaches the configured FlushBlockBySizePolicy(const uint64_t block_size, const uint64_t block_size_deviation, + const bool align, const BlockBuilder& data_block_builder) : block_size_(block_size), block_size_deviation_limit_( ((block_size * (100 - block_size_deviation)) + 99) / 100), + align_(align), data_block_builder_(data_block_builder) {} virtual bool Update(const Slice& key, @@ -51,8 +54,13 @@ class FlushBlockBySizePolicy : public FlushBlockPolicy { } const auto curr_size = data_block_builder_.CurrentSizeEstimate(); - const auto estimated_size_after = - data_block_builder_.EstimateSizeAfterKV(key, value); + auto estimated_size_after = + data_block_builder_.EstimateSizeAfterKV(key, value); + + if (align_) { + estimated_size_after += kBlockTrailerSize; + return estimated_size_after > block_size_; + } return estimated_size_after > block_size_ && curr_size > block_size_deviation_limit_; @@ -60,6 +68,7 @@ class FlushBlockBySizePolicy : public FlushBlockPolicy { const uint64_t block_size_; const uint64_t block_size_deviation_limit_; + const bool align_; const BlockBuilder& data_block_builder_; }; @@ -68,13 +77,13 @@ FlushBlockPolicy* FlushBlockBySizePolicyFactory::NewFlushBlockPolicy( const BlockBuilder& data_block_builder) const { return new FlushBlockBySizePolicy( table_options.block_size, table_options.block_size_deviation, - data_block_builder); + table_options.block_align, data_block_builder); } FlushBlockPolicy* FlushBlockBySizePolicyFactory::NewFlushBlockPolicy( const uint64_t size, const int deviation, const BlockBuilder& data_block_builder) { - return new FlushBlockBySizePolicy(size, deviation, data_block_builder); + return new FlushBlockBySizePolicy(size, deviation, false, data_block_builder); } } // namespace rocksdb diff --git a/table/table_test.cc b/table/table_test.cc index e893201b1..bb4e7d850 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -3197,6 +3197,110 @@ TEST_F(BlockBasedTableTest, TableWithGlobalSeqno) { delete iter; } +TEST_F(BlockBasedTableTest, BlockAlignTest) { + BlockBasedTableOptions bbto; + bbto.block_align = true; + test::StringSink* sink = new test::StringSink(); + unique_ptr file_writer(test::GetWritableFileWriter(sink)); + Options options; + options.compression = kNoCompression; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + const ImmutableCFOptions ioptions(options); + InternalKeyComparator ikc(options.comparator); + std::vector> + int_tbl_prop_collector_factories; + std::string column_family_name; + std::unique_ptr builder(options.table_factory->NewTableBuilder( + TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories, + kNoCompression, CompressionOptions(), + nullptr /* compression_dict */, + false /* skip_filters */, column_family_name, -1), + TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, + file_writer.get())); + + for (int i = 1; i <= 10000; ++i) { + std::ostringstream ostr; + ostr << std::setfill('0') << std::setw(5) << i; + std::string key = ostr.str(); + std::string value = "val"; + InternalKey ik(key, 0, kTypeValue); + + builder->Add(ik.Encode(), value); + } + ASSERT_OK(builder->Finish()); + file_writer->Flush(); + + test::RandomRWStringSink ss_rw(sink); + unique_ptr file_reader( + test::GetRandomAccessFileReader( + new test::StringSource(ss_rw.contents(), 73342, true))); + + // Helper function to get version, global_seqno, global_seqno_offset + std::function VerifyBlockAlignment = [&]() { + TableProperties* props = nullptr; + ASSERT_OK(ReadTableProperties(file_reader.get(), ss_rw.contents().size(), + kBlockBasedTableMagicNumber, ioptions, + &props)); + + uint64_t data_block_size = props->data_size / props->num_data_blocks; + ASSERT_EQ(data_block_size, 4096); + ASSERT_EQ(props->data_size, data_block_size * props->num_data_blocks); + delete props; + }; + + VerifyBlockAlignment(); + + // The below block of code verifies that we can read back the keys. Set + // block_align to false when creating the reader to ensure we can flip between + // the two modes without any issues + std::unique_ptr table_reader; + bbto.block_align = false; + Options options2; + options2.table_factory.reset(NewBlockBasedTableFactory(bbto)); + ImmutableCFOptions ioptions2(options2); + ASSERT_OK(ioptions.table_factory->NewTableReader( + TableReaderOptions(ioptions2, EnvOptions(), + GetPlainInternalComparator(options2.comparator)), + std::move(file_reader), ss_rw.contents().size(), &table_reader)); + + std::unique_ptr db_iter( + table_reader->NewIterator(ReadOptions())); + + int expected_key = 1; + for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) { + std::ostringstream ostr; + ostr << std::setfill('0') << std::setw(5) << expected_key++; + std::string key = ostr.str(); + std::string value = "val"; + + ASSERT_OK(db_iter->status()); + ASSERT_EQ(ExtractUserKey(db_iter->key()).ToString(), key); + ASSERT_EQ(db_iter->value().ToString(), value); + } + expected_key--; + ASSERT_EQ(expected_key, 10000); + table_reader.reset(); +} + +TEST_F(BlockBasedTableTest, BadOptions) { + rocksdb::Options options; + options.compression = kNoCompression; + rocksdb::BlockBasedTableOptions bbto; + bbto.block_size = 4000; + bbto.block_align = true; + + const std::string kDBPath = test::TmpDir() + "/table_prefix_test"; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + DestroyDB(kDBPath, options); + rocksdb::DB* db; + ASSERT_NOK(rocksdb::DB::Open(options, kDBPath, &db)); + + bbto.block_size = 4096; + options.compression = kSnappyCompression; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + ASSERT_NOK(rocksdb::DB::Open(options, kDBPath, &db)); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 855e0764f..10b3c364b 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -452,6 +452,9 @@ DEFINE_bool(enable_index_compression, rocksdb::BlockBasedTableOptions().enable_index_compression, "Compress the index block"); +DEFINE_bool(block_align, rocksdb::BlockBasedTableOptions().block_align, + "Align data blocks on page size"); + DEFINE_int64(compressed_cache_size, -1, "Number of bytes to use as a cache of compressed data."); @@ -3140,6 +3143,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { block_based_options.read_amp_bytes_per_bit = FLAGS_read_amp_bytes_per_bit; block_based_options.enable_index_compression = FLAGS_enable_index_compression; + block_based_options.block_align = FLAGS_block_align; if (FLAGS_read_cache_path != "") { #ifndef ROCKSDB_LITE Status rc_status; diff --git a/util/aligned_buffer.h b/util/aligned_buffer.h index 8d4a0be58..0c36eca9d 100644 --- a/util/aligned_buffer.h +++ b/util/aligned_buffer.h @@ -161,6 +161,12 @@ public: } } + void PadWith(size_t pad_size, int padding) { + assert((pad_size + cursize_) <= capacity_); + memset(bufstart_ + cursize_, padding, pad_size); + cursize_ += pad_size; + } + // After a partial flush move the tail to the beginning of the buffer void RefitTail(size_t tail_offset, size_t tail_size) { if (tail_size > 0) { diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 38228a8e8..9d4298b1e 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -219,6 +219,31 @@ Status WritableFileWriter::Append(const Slice& data) { return s; } +Status WritableFileWriter::Pad(const size_t pad_bytes) { + assert(pad_bytes < kDefaultPageSize); + size_t left = pad_bytes; + size_t cap = buf_.Capacity() - buf_.CurrentSize(); + + // Assume pad_bytes is small compared to buf_ capacity. So we always + // use buf_ rather than write directly to file in certain cases like + // Append() does. + while (left) { + size_t append_bytes = std::min(cap, left); + buf_.PadWith(append_bytes, 0); + left -= append_bytes; + if (left > 0) { + Status s = Flush(); + if (!s.ok()) { + return s; + } + } + cap = buf_.Capacity() - buf_.CurrentSize(); + } + pending_sync_ = true; + filesize_ += pad_bytes; + return Status::OK(); +} + Status WritableFileWriter::Close() { // Do not quit immediately on failure the file MUST be closed diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 9bc3b9437..9db12ba06 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -166,6 +166,8 @@ class WritableFileWriter { Status Append(const Slice& data); + Status Pad(const size_t pad_bytes); + Status Flush(); Status Close();