Expand effect of dictionary settings in `ColumnFamilyOptions::compression_opts` (#7619)

Summary:
In dictionary compression's initial implementation, in order to save CPU overhead, we only enabled it
for bottom level under the assumption that the vast majority of data is
stored there. At that time, there was no
such thing as `ColumnFamilyOptions::bottommost_compression_opts`, so we just
hardcoded disabling dictionary compression in flush and compactions to
non-bottommost level. Now, we have users who generate all their files
through flush and are considering using dictionary compression.

To support such a use case, this PR expands the scope of `ColumnFamilyOptions::compression_opts` to
additionally include flushed files and files generated by compaction to
a non-bottommost level. Users can still get the old behavior by moving
their dictionary settings to `ColumnFamilyOptions::bottommost_compression_opts`
and explicitly enabling both that and `ColumnFamilyOptions::bottommost_compression`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7619

Reviewed By: ltamasi

Differential Revision: D24665610

Pulled By: ajkr

fbshipit-source-id: 656b90bce1033fe21c71e09af931ef5bde3e464c
main
Andrew Kryczka 4 years ago committed by Facebook GitHub Bot
parent a388c8cc6b
commit 1adbceb581
  1. 3
      HISTORY.md
  2. 7
      db/builder.cc
  3. 6
      db/compaction/compaction.cc
  4. 8
      db/db_block_cache_test.cc
  5. 173
      db/db_test2.cc
  6. 11
      options/options_helper.cc
  7. 2
      options/options_helper.h
  8. 37
      util/compression.h

@ -14,6 +14,9 @@
### Public API Change ### Public API Change
* Deprecate `BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache` and `BlockBasedTableOptions::pin_top_level_index_and_filter`. These options still take effect until users migrate to the replacement APIs in `BlockBasedTableOptions::metadata_cache_options`. Migration guidance can be found in the API comments on the deprecated options. * Deprecate `BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache` and `BlockBasedTableOptions::pin_top_level_index_and_filter`. These options still take effect until users migrate to the replacement APIs in `BlockBasedTableOptions::metadata_cache_options`. Migration guidance can be found in the API comments on the deprecated options.
### Behavior Changes
* The dictionary compression settings specified in `ColumnFamilyOptions::compression_opts` now additionally affect files generated by flush and compaction to non-bottommost level. Previously those settings at most affected files generated by compaction to bottommost level, depending on whether `ColumnFamilyOptions::bottommost_compression_opts` overrode them. Users who relied on dictionary compression settings in `ColumnFamilyOptions::compression_opts` affecting only the bottommost level can keep the behavior by moving their dictionary settings to `ColumnFamilyOptions::bottommost_compression_opts` and setting its `enabled` flag.
## 6.14 (10/09/2020) ## 6.14 (10/09/2020)
### Bug fixes ### Bug fixes
* Fixed a bug after a `CompactRange()` with `CompactRangeOptions::change_level` set fails due to a conflict in the level change step, which caused all subsequent calls to `CompactRange()` with `CompactRangeOptions::change_level` set to incorrectly fail with a `Status::NotSupported("another thread is refitting")` error. * Fixed a bug after a `CompactRange()` with `CompactRangeOptions::change_level` set fails due to a conflict in the level change step, which caused all subsequent calls to `CompactRange()` with `CompactRangeOptions::change_level` set to incorrectly fail with a `Status::NotSupported("another thread is refitting")` error.

@ -124,11 +124,6 @@ Status BuildTable(
if (iter->Valid() || !range_del_agg->IsEmpty()) { if (iter->Valid() || !range_del_agg->IsEmpty()) {
TableBuilder* builder; TableBuilder* builder;
std::unique_ptr<WritableFileWriter> file_writer; std::unique_ptr<WritableFileWriter> file_writer;
// Currently we only enable dictionary compression during compaction to the
// bottommost level.
CompressionOptions compression_opts_for_flush(compression_opts);
compression_opts_for_flush.max_dict_bytes = 0;
compression_opts_for_flush.zstd_max_train_bytes = 0;
{ {
std::unique_ptr<FSWritableFile> file; std::unique_ptr<FSWritableFile> file;
#ifndef NDEBUG #ifndef NDEBUG
@ -160,7 +155,7 @@ Status BuildTable(
ioptions, mutable_cf_options, internal_comparator, ioptions, mutable_cf_options, internal_comparator,
int_tbl_prop_collector_factories, column_family_id, int_tbl_prop_collector_factories, column_family_id,
column_family_name, file_writer.get(), compression, column_family_name, file_writer.get(), compression,
sample_for_compression, compression_opts_for_flush, level, sample_for_compression, compression_opts, level,
false /* skip_filters */, creation_time, oldest_key_time, false /* skip_filters */, creation_time, oldest_key_time,
0 /*target_file_size*/, file_creation_time, db_id, db_session_id); 0 /*target_file_size*/, file_creation_time, db_id, db_session_id);
} }

@ -248,12 +248,6 @@ Compaction::Compaction(VersionStorageInfo* vstorage,
if (max_subcompactions_ == 0) { if (max_subcompactions_ == 0) {
max_subcompactions_ = _mutable_db_options.max_subcompactions; max_subcompactions_ = _mutable_db_options.max_subcompactions;
} }
if (!bottommost_level_) {
// Currently we only enable dictionary compression during compaction to the
// bottommost level.
output_compression_opts_.max_dict_bytes = 0;
output_compression_opts_.zstd_max_train_bytes = 0;
}
#ifndef NDEBUG #ifndef NDEBUG
for (size_t i = 1; i < inputs_.size(); ++i) { for (size_t i = 1; i < inputs_.size(); ++i) {

@ -837,8 +837,9 @@ TEST_F(DBBlockCacheTest, CacheCompressionDict) {
Random rnd(301); Random rnd(301);
for (auto compression_type : compression_types) { for (auto compression_type : compression_types) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.compression = compression_type; options.bottommost_compression = compression_type;
options.compression_opts.max_dict_bytes = 4096; options.bottommost_compression_opts.max_dict_bytes = 4096;
options.bottommost_compression_opts.enabled = true;
options.create_if_missing = true; options.create_if_missing = true;
options.num_levels = 2; options.num_levels = 2;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
@ -991,6 +992,9 @@ TEST_P(DBBlockCachePinningTest, TwoLevelDB) {
++expected_index_misses; ++expected_index_misses;
} }
} }
if (unpartitioned_pinning_ == PinningTier::kNone) {
++expected_compression_dict_misses;
}
ASSERT_EQ(expected_filter_misses, ASSERT_EQ(expected_filter_misses,
TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(expected_index_misses, ASSERT_EQ(expected_index_misses,

@ -12,6 +12,7 @@
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "db/read_callback.h" #include "db/read_callback.h"
#include "options/options_helper.h"
#include "port/port.h" #include "port/port.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/persistent_cache.h" #include "rocksdb/persistent_cache.h"
@ -1390,6 +1391,178 @@ TEST_F(DBTest2, PresetCompressionDictLocality) {
} }
} }
class PresetCompressionDictTest
: public DBTestBase,
public testing::WithParamInterface<std::tuple<CompressionType, bool>> {
public:
PresetCompressionDictTest()
: DBTestBase("/db_test2", false /* env_do_fsync */),
compression_type_(std::get<0>(GetParam())),
bottommost_(std::get<1>(GetParam())) {}
protected:
const CompressionType compression_type_;
const bool bottommost_;
};
INSTANTIATE_TEST_CASE_P(
DBTest2, PresetCompressionDictTest,
::testing::Combine(::testing::ValuesIn(GetSupportedDictCompressions()),
::testing::Bool()));
TEST_P(PresetCompressionDictTest, Flush) {
// Verifies that dictionary is generated and written during flush only when
// `ColumnFamilyOptions::compression` enables dictionary.
const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10;
const size_t kDictLen = 4 << 10;
Options options = CurrentOptions();
if (bottommost_) {
options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
} else {
options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen;
}
options.memtable_factory.reset(new SpecialSkipListFactory(kKeysPerFile));
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
uint64_t prev_compression_dict_misses =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
Random rnd(301);
for (size_t i = 0; i <= kKeysPerFile; ++i) {
ASSERT_OK(Put(Key(static_cast<int>(i)), rnd.RandomString(kValueLen)));
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
// If there's a compression dictionary, it should have been loaded when the
// flush finished, incurring a cache miss.
uint64_t expected_compression_dict_misses;
if (bottommost_) {
expected_compression_dict_misses = prev_compression_dict_misses;
} else {
expected_compression_dict_misses = prev_compression_dict_misses + 1;
}
ASSERT_EQ(expected_compression_dict_misses,
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
}
TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
// Verifies that dictionary is generated and written during compaction to
// non-bottommost level only when `ColumnFamilyOptions::compression` enables
// dictionary.
const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10;
const size_t kDictLen = 4 << 10;
Options options = CurrentOptions();
if (bottommost_) {
options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
} else {
options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen;
}
options.disable_auto_compactions = true;
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
Random rnd(301);
for (size_t j = 0; j <= kKeysPerFile; ++j) {
ASSERT_OK(Put(Key(static_cast<int>(j)), rnd.RandomString(kValueLen)));
}
ASSERT_OK(Flush());
MoveFilesToLevel(2);
for (int i = 0; i < 2; ++i) {
for (size_t j = 0; j <= kKeysPerFile; ++j) {
ASSERT_OK(Put(Key(static_cast<int>(j)), rnd.RandomString(kValueLen)));
}
ASSERT_OK(Flush());
}
#ifndef ROCKSDB_LITE
ASSERT_EQ("2,0,1", FilesPerLevel(0));
#endif // ROCKSDB_LITE
uint64_t prev_compression_dict_misses =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
// This L0->L1 compaction merges the two L0 files into L1. The produced L1
// file is not bottommost due to the existing L2 file covering the same key-
// range.
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr));
#ifndef ROCKSDB_LITE
ASSERT_EQ("0,1,1", FilesPerLevel(0));
#endif // ROCKSDB_LITE
// If there's a compression dictionary, it should have been loaded when the
// compaction finished, incurring a cache miss.
uint64_t expected_compression_dict_misses;
if (bottommost_) {
expected_compression_dict_misses = prev_compression_dict_misses;
} else {
expected_compression_dict_misses = prev_compression_dict_misses + 1;
}
ASSERT_EQ(expected_compression_dict_misses,
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
}
TEST_P(PresetCompressionDictTest, CompactBottommost) {
// Verifies that dictionary is generated and written during compaction to
// non-bottommost level only when either `ColumnFamilyOptions::compression` or
// `ColumnFamilyOptions::bottommost_compression` enables dictionary.
const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10;
const size_t kDictLen = 4 << 10;
Options options = CurrentOptions();
if (bottommost_) {
options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
} else {
options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen;
}
options.disable_auto_compactions = true;
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
Random rnd(301);
for (int i = 0; i < 2; ++i) {
for (size_t j = 0; j <= kKeysPerFile; ++j) {
ASSERT_OK(Put(Key(static_cast<int>(j)), rnd.RandomString(kValueLen)));
}
ASSERT_OK(Flush());
}
#ifndef ROCKSDB_LITE
ASSERT_EQ("2", FilesPerLevel(0));
#endif // ROCKSDB_LITE
uint64_t prev_compression_dict_misses =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
CompactRangeOptions cro;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
#ifndef ROCKSDB_LITE
ASSERT_EQ("0,1", FilesPerLevel(0));
#endif // ROCKSDB_LITE
// If there's a compression dictionary, it should have been loaded when the
// compaction finished, incurring a cache miss.
ASSERT_EQ(prev_compression_dict_misses + 1,
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
}
class CompactionCompressionListener : public EventListener { class CompactionCompressionListener : public EventListener {
public: public:
explicit CompactionCompressionListener(Options* db_options) explicit CompactionCompressionListener(Options* db_options)

@ -299,6 +299,17 @@ std::vector<CompressionType> GetSupportedCompressions() {
return supported_compressions; return supported_compressions;
} }
std::vector<CompressionType> GetSupportedDictCompressions() {
std::vector<CompressionType> dict_compression_types;
for (const auto& comp_to_name : OptionsHelper::compression_type_string_map) {
CompressionType t = comp_to_name.second;
if (t != kDisableCompressionOption && DictCompressionTypeSupported(t)) {
dict_compression_types.push_back(t);
}
}
return dict_compression_types;
}
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
bool ParseSliceTransformHelper( bool ParseSliceTransformHelper(
const std::string& kFixedPrefixName, const std::string& kCappedPrefixName, const std::string& kFixedPrefixName, const std::string& kCappedPrefixName,

@ -25,6 +25,8 @@ struct Options;
std::vector<CompressionType> GetSupportedCompressions(); std::vector<CompressionType> GetSupportedCompressions();
std::vector<CompressionType> GetSupportedDictCompressions();
// Checks that the combination of DBOptions and ColumnFamilyOptions are valid // Checks that the combination of DBOptions and ColumnFamilyOptions are valid
Status ValidateOptions(const DBOptions& db_opts, Status ValidateOptions(const DBOptions& db_opts,
const ColumnFamilyOptions& cf_opts); const ColumnFamilyOptions& cf_opts);

@ -540,6 +540,43 @@ inline bool CompressionTypeSupported(CompressionType compression_type) {
} }
} }
inline bool DictCompressionTypeSupported(CompressionType compression_type) {
switch (compression_type) {
case kNoCompression:
return false;
case kSnappyCompression:
return false;
case kZlibCompression:
return Zlib_Supported();
case kBZip2Compression:
return false;
case kLZ4Compression:
case kLZ4HCCompression:
#if LZ4_VERSION_NUMBER >= 10400 // r124+
return LZ4_Supported();
#else
return false;
#endif
case kXpressCompression:
return false;
case kZSTDNotFinalCompression:
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
return ZSTDNotFinal_Supported();
#else
return false;
#endif
case kZSTD:
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
return ZSTD_Supported();
#else
return false;
#endif
default:
assert(false);
return false;
}
}
inline std::string CompressionTypeToString(CompressionType compression_type) { inline std::string CompressionTypeToString(CompressionType compression_type) {
switch (compression_type) { switch (compression_type) {
case kNoCompression: case kNoCompression:

Loading…
Cancel
Save