From 321dfdc3aeff82269579f39ad2d0e108b7f35e3c Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Sat, 27 Oct 2012 23:13:17 -0700 Subject: [PATCH] Allow having different compression algorithms on different levels. Summary: The leveldb API is enhanced to support different compression algorithms at different levels. This adds the option min_level_to_compress to db_bench that specifies the minimum level for which compression should be done when compression is enabled. This can be used to disable compression for levels 0 and 1 which are likely to suffer from stalls because of the CPU load for memtable flushes and (L0,L1) compaction. Level 0 is special as it gets frequent memtable flushes. Level 1 is special as it frequently gets all:all file compactions between it and level 0. But all other levels could be the same. For any level N where N > 1, the rate of sequential IO for that level should be the same. The last level is the exception because it might not be full and because files from it are not read to compact with the next larger level. The same amount of time will be spent doing compaction at any level N excluding N=0, 1 or the last level. By this standard all of those levels should use the same compression. The difference is that the loss (using more disk space) from a faster compression algorithm is less significant for N=2 than for N=3. So we might be willing to trade disk space for faster write rates with no compression for L0 and L1, snappy for L2, zlib for L3. Using a faster compression algorithm for the mid levels also allows us to reclaim some cpu without trading off much loss in disk space overhead. Also note that little is to be gained by compressing levels 0 and 1. For a 4-level tree they account for 10% of the data. For a 5-level tree they account for 1% of the data. With compression enabled: * memtable flush rate is ~18MB/second * (L0,L1) compaction rate is ~30MB/second With compression enabled but min_level_to_compress=2 * memtable flush rate is ~320MB/second * (L0,L1) compaction rate is ~560MB/second This practicaly takes the same code from https://reviews.facebook.net/D6225 but makes the leveldb api more general purpose with a few additional lines of code. Test Plan: make check Differential Revision: https://reviews.facebook.net/D6261 --- db/builder.cc | 2 +- db/db_bench.cc | 21 ++++++++ db/db_impl.cc | 12 ++++- db/db_test.cc | 92 +++++++++++++++++++++++++++++++++ include/leveldb/options.h | 14 +++++ include/leveldb/table_builder.h | 7 ++- table/table_builder.cc | 21 ++++++-- util/options.cc | 10 +++- 8 files changed, 171 insertions(+), 8 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 4a07aeeb2..1fc200930 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -32,7 +32,7 @@ Status BuildTable(const std::string& dbname, return s; } - TableBuilder* builder = new TableBuilder(options, file); + TableBuilder* builder = new TableBuilder(options, file, 0); meta->smallest.DecodeFrom(iter->key()); for (; iter->Valid(); iter->Next()) { Slice key = iter->key(); diff --git a/db/db_bench.cc b/db/db_bench.cc index d298ac29e..22397a2fc 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -179,6 +179,10 @@ static uint64_t FLAGS_delete_obsolete_files_period_micros = 0; static enum leveldb::CompressionType FLAGS_compression_type = leveldb::kSnappyCompression; +// Allows compression for levels 0 and 1 to be disabled when +// other levels are compressed +static int FLAGS_min_level_to_compress = -1; + // posix or hdfs environment static leveldb::Env* FLAGS_env = leveldb::Env::Default(); @@ -913,6 +917,17 @@ class Benchmark { options.level0_slowdown_writes_trigger = FLAGS_level0_slowdown_writes_trigger; options.compression = FLAGS_compression_type; + if (FLAGS_min_level_to_compress >= 0) { + assert(FLAGS_min_level_to_compress <= FLAGS_num_levels); + options.compression_per_level = new CompressionType[FLAGS_num_levels]; + for (unsigned int i = 0; i < FLAGS_min_level_to_compress; i++) { + options.compression_per_level[i] = kNoCompression; + } + for (unsigned int i = FLAGS_min_level_to_compress; + i < FLAGS_num_levels; i++) { + options.compression_per_level[i] = FLAGS_compression_type; + } + } options.disable_seek_compaction = FLAGS_disable_seek_compaction; options.delete_obsolete_files_period_micros = FLAGS_delete_obsolete_files_period_micros; @@ -922,6 +937,9 @@ class Benchmark { fprintf(stderr, "open error: %s\n", s.ToString().c_str()); exit(1); } + if (FLAGS_min_level_to_compress >= 0) { + delete options.compression_per_level; + } } void WriteSeq(ThreadState* thread) { @@ -1321,6 +1339,9 @@ int main(int argc, char** argv) { else { fprintf(stdout, "Cannot parse %s\n", argv[i]); } + } else if (sscanf(argv[i], "--min_level_to_compress=%d%c", &n, &junk) == 1 + && n >= 0) { + FLAGS_min_level_to_compress = n; } else if (sscanf(argv[i], "--disable_seek_compaction=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { FLAGS_disable_seek_compaction = n; diff --git a/db/db_impl.cc b/db/db_impl.cc index f179e77dc..211010990 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -156,6 +156,12 @@ Options SanitizeOptions(const std::string& dbname, if (result.block_cache == NULL) { result.block_cache = NewLRUCache(8 << 20); } + if (src.compression_per_level != NULL) { + result.compression_per_level = new CompressionType[src.num_levels]; + for (unsigned int i = 0; i < src.num_levels; i++) { + result.compression_per_level[i] = src.compression_per_level[i]; + } + } return result; } @@ -246,6 +252,9 @@ DBImpl::~DBImpl() { if (owns_cache_) { delete options_.block_cache; } + if (options_.compression_per_level != NULL) { + delete options_.compression_per_level; + } delete logger_; } @@ -961,7 +970,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { std::string fname = TableFileName(dbname_, file_number); Status s = env_->NewWritableFile(fname, &compact->outfile); if (s.ok()) { - compact->builder = new TableBuilder(options_, compact->outfile); + compact->builder = new TableBuilder(options_, compact->outfile, + compact->compaction->level() + 1); } return s; } diff --git a/db/db_test.cc b/db/db_test.cc index daea3aaaf..e5a8f8711 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -20,6 +20,24 @@ namespace leveldb { +static bool SnappyCompressionSupported() { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::Snappy_Compress(in.data(), in.size(), &out); +} + +static bool ZlibCompressionSupported() { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::Zlib_Compress(in.data(), in.size(), &out); +} + +static bool BZip2CompressionSupported() { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::BZip2_Compress(in.data(), in.size(), &out); +} + static std::string RandomString(Random* rnd, int len) { std::string r; test::RandomString(rnd, len, &r); @@ -1058,6 +1076,80 @@ TEST(DBTest, CompactionTrigger) { ASSERT_EQ(NumTableFilesAtLevel(1), 1); } +void MinLevelHelper(DBTest* self, Options& options) { + Random rnd(301); + + for (int num = 0; + num < options.level0_file_num_compaction_trigger - 1; + num++) + { + std::vector values; + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + values.push_back(RandomString(&rnd, 10000)); + ASSERT_OK(self->Put(Key(i), values[i])); + } + self->dbfull()->TEST_WaitForCompactMemTable(); + ASSERT_EQ(self->NumTableFilesAtLevel(0), num + 1); + } + + //generate one more file in level-0, and should trigger level-0 compaction + std::vector values; + for (int i = 0; i < 12; i++) { + values.push_back(RandomString(&rnd, 10000)); + ASSERT_OK(self->Put(Key(i), values[i])); + } + self->dbfull()->TEST_WaitForCompact(); + + ASSERT_EQ(self->NumTableFilesAtLevel(0), 0); + ASSERT_EQ(self->NumTableFilesAtLevel(1), 1); +} + +TEST(DBTest, MinLevelToCompress) { + Options options = CurrentOptions(); + options.write_buffer_size = 100<<10; //100KB + options.num_levels = 3; + options.max_mem_compaction_level = 0; + options.level0_file_num_compaction_trigger = 3; + options.create_if_missing = true; + CompressionType type; + + if (SnappyCompressionSupported()) { + type = kSnappyCompression; + fprintf(stderr, "using snappy\n"); + } else if (ZlibCompressionSupported()) { + type = kZlibCompression; + fprintf(stderr, "using zlib\n"); + } else if (BZip2CompressionSupported()) { + type = kBZip2Compression; + fprintf(stderr, "using bzip2\n"); + } else { + fprintf(stderr, "skipping test, compression disabled\n"); + return; + } + options.compression_per_level = new CompressionType[options.num_levels]; + + // do not compress L0 + for (int i = 0; i < 1; i++) { + options.compression_per_level[i] = kNoCompression; + } + for (int i = 1; i < options.num_levels; i++) { + options.compression_per_level[i] = type; + } + Reopen(&options); + MinLevelHelper(this, options); + + // do not compress L0 and L1 + for (int i = 0; i < 2; i++) { + options.compression_per_level[i] = kNoCompression; + } + for (int i = 2; i < options.num_levels; i++) { + options.compression_per_level[i] = type; + } + DestroyAndReopen(&options); + MinLevelHelper(this, options); +} + TEST(DBTest, RepeatedWritesToSameKey) { Options options = CurrentOptions(); options.env = env_; diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 3b94d829f..c75e725f5 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -134,6 +134,20 @@ struct Options { // efficiently detect that and will switch to uncompressed mode. CompressionType compression; + // Different levels can have different compression policies. There + // are cases where most lower levels would like to quick compression + // algorithm while the higher levels (which have more data) use + // compression algorithms that have better compression but could + // be slower. This array, if non NULL, should have an entry for + // each level of the database. This array, if non NULL, overides the + // value specified in the previous field 'compression'. The caller is + // reponsible for allocating memory and initializing the values in it + // before invoking Open(). The caller is responsible for freeing this + // array and it could be freed anytime after the return from Open(). + // This could have been a std::vector but that makes the equivalent + // java/C api hard to construct. + CompressionType* compression_per_level; + // If non-NULL, use the specified filter policy to reduce disk reads. // Many applications will benefit from passing the result of // NewBloomFilterPolicy() here. diff --git a/include/leveldb/table_builder.h b/include/leveldb/table_builder.h index 5fd1dc71f..5c755137e 100644 --- a/include/leveldb/table_builder.h +++ b/include/leveldb/table_builder.h @@ -27,8 +27,10 @@ class TableBuilder { public: // Create a builder that will store the contents of the table it is // building in *file. Does not close the file. It is up to the - // caller to close the file after calling Finish(). - TableBuilder(const Options& options, WritableFile* file); + // caller to close the file after calling Finish(). The output file + // will be part of level specified by 'level'. A value of -1 means + // that the caller does not know which level the output file will reside. + TableBuilder(const Options& options, WritableFile* file, int level=-1); // REQUIRES: Either Finish() or Abandon() has been called. ~TableBuilder(); @@ -81,6 +83,7 @@ class TableBuilder { struct Rep; Rep* rep_; + int level_; // No copying allowed TableBuilder(const TableBuilder&); diff --git a/table/table_builder.cc b/table/table_builder.cc index eabdedead..5306b2976 100644 --- a/table/table_builder.cc +++ b/table/table_builder.cc @@ -60,8 +60,9 @@ struct TableBuilder::Rep { } }; -TableBuilder::TableBuilder(const Options& options, WritableFile* file) - : rep_(new Rep(options, file)) { +TableBuilder::TableBuilder(const Options& options, WritableFile* file, + int level) + : rep_(new Rep(options, file)), level_(level) { if (rep_->filter_block != NULL) { rep_->filter_block->StartBlock(0); } @@ -152,7 +153,21 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { Slice block_contents; std::string* compressed = &r->compressed_output; - CompressionType type = r->options.compression; + CompressionType type; + // If the use has specified a different compression level for each level, + // then pick the compresison for that level. + if (r->options.compression_per_level != NULL) { + if (level_ == -1) { + // this is mostly for backward compatibility. The builder does not + // know which level this file belongs to. Apply the compression level + // specified for level 0 to all levels. + type = r->options.compression_per_level[0]; + } else { + type = r->options.compression_per_level[level_]; + } + } else { + type = r->options.compression; + } switch (type) { case kNoCompression: block_contents = raw; diff --git a/util/options.cc b/util/options.cc index eb132877e..40933c54d 100644 --- a/util/options.cc +++ b/util/options.cc @@ -24,6 +24,7 @@ Options::Options() block_size(4096), block_restart_interval(16), compression(kSnappyCompression), + compression_per_level(NULL), filter_policy(NULL), num_levels(7), level0_file_num_compaction_trigger(4), @@ -63,7 +64,14 @@ Options::Dump( Log(log," Options.block_cache_size: %zd", block_cache->GetCapacity()); Log(log," Options.block_size: %zd", block_size); Log(log,"Options.block_restart_interval: %d", block_restart_interval); - Log(log," Options.compression: %d", compression); + if (compression_per_level != NULL) { + for (unsigned int i = 0; i < num_levels; i++){ + Log(log," Options.compression[%d]: %d", + i, compression_per_level[i]); + } + } else { + Log(log," Options.compression: %d", compression); + } Log(log," Options.filter_policy: %s", filter_policy == NULL ? "NULL" : filter_policy->Name()); Log(log," Options.num_levels: %d", num_levels);