Allow having different compression algorithms on different levels.

Summary:
The leveldb API is enhanced to support different compression algorithms at
different levels.

This adds the option min_level_to_compress to db_bench that specifies
the minimum level for which compression should be done when
compression is enabled. This can be used to disable compression for levels
0 and 1 which are likely to suffer from stalls because of the CPU load
for memtable flushes and (L0,L1) compaction.  Level 0 is special as it
gets frequent memtable flushes. Level 1 is special as it frequently
gets all:all file compactions between it and level 0. But all other levels
could be the same. For any level N where N > 1, the rate of sequential
IO for that level should be the same. The last level is the
exception because it might not be full and because files from it are
not read to compact with the next larger level.

The same amount of time will be spent doing compaction at any
level N excluding N=0, 1 or the last level. By this standard all
of those levels should use the same compression. The difference is that
the loss (using more disk space) from a faster compression algorithm
is less significant for N=2 than for N=3. So we might be willing to
trade disk space for faster write rates with no compression
for L0 and L1, snappy for L2, zlib for L3. Using a faster compression
algorithm for the mid levels also allows us to reclaim some cpu
without trading off much loss in disk space overhead.

Also note that little is to be gained by compressing levels 0 and 1. For
a 4-level tree they account for 10% of the data. For a 5-level tree they
account for 1% of the data.

With compression enabled:
* memtable flush rate is ~18MB/second
* (L0,L1) compaction rate is ~30MB/second

With compression enabled but min_level_to_compress=2
* memtable flush rate is ~320MB/second
* (L0,L1) compaction rate is ~560MB/second

This practicaly takes the same code from https://reviews.facebook.net/D6225
but makes the leveldb api more general purpose with a few additional
lines of code.

Test Plan: make check

Differential Revision: https://reviews.facebook.net/D6261
main
Dhruba Borthakur 12 years ago
parent acc8567b24
commit 321dfdc3ae
  1. 2
      db/builder.cc
  2. 21
      db/db_bench.cc
  3. 12
      db/db_impl.cc
  4. 92
      db/db_test.cc
  5. 14
      include/leveldb/options.h
  6. 7
      include/leveldb/table_builder.h
  7. 21
      table/table_builder.cc
  8. 8
      util/options.cc

@ -32,7 +32,7 @@ Status BuildTable(const std::string& dbname,
return s; return s;
} }
TableBuilder* builder = new TableBuilder(options, file); TableBuilder* builder = new TableBuilder(options, file, 0);
meta->smallest.DecodeFrom(iter->key()); meta->smallest.DecodeFrom(iter->key());
for (; iter->Valid(); iter->Next()) { for (; iter->Valid(); iter->Next()) {
Slice key = iter->key(); Slice key = iter->key();

@ -179,6 +179,10 @@ static uint64_t FLAGS_delete_obsolete_files_period_micros = 0;
static enum leveldb::CompressionType FLAGS_compression_type = static enum leveldb::CompressionType FLAGS_compression_type =
leveldb::kSnappyCompression; leveldb::kSnappyCompression;
// Allows compression for levels 0 and 1 to be disabled when
// other levels are compressed
static int FLAGS_min_level_to_compress = -1;
// posix or hdfs environment // posix or hdfs environment
static leveldb::Env* FLAGS_env = leveldb::Env::Default(); static leveldb::Env* FLAGS_env = leveldb::Env::Default();
@ -913,6 +917,17 @@ class Benchmark {
options.level0_slowdown_writes_trigger = options.level0_slowdown_writes_trigger =
FLAGS_level0_slowdown_writes_trigger; FLAGS_level0_slowdown_writes_trigger;
options.compression = FLAGS_compression_type; options.compression = FLAGS_compression_type;
if (FLAGS_min_level_to_compress >= 0) {
assert(FLAGS_min_level_to_compress <= FLAGS_num_levels);
options.compression_per_level = new CompressionType[FLAGS_num_levels];
for (unsigned int i = 0; i < FLAGS_min_level_to_compress; i++) {
options.compression_per_level[i] = kNoCompression;
}
for (unsigned int i = FLAGS_min_level_to_compress;
i < FLAGS_num_levels; i++) {
options.compression_per_level[i] = FLAGS_compression_type;
}
}
options.disable_seek_compaction = FLAGS_disable_seek_compaction; options.disable_seek_compaction = FLAGS_disable_seek_compaction;
options.delete_obsolete_files_period_micros = options.delete_obsolete_files_period_micros =
FLAGS_delete_obsolete_files_period_micros; FLAGS_delete_obsolete_files_period_micros;
@ -922,6 +937,9 @@ class Benchmark {
fprintf(stderr, "open error: %s\n", s.ToString().c_str()); fprintf(stderr, "open error: %s\n", s.ToString().c_str());
exit(1); exit(1);
} }
if (FLAGS_min_level_to_compress >= 0) {
delete options.compression_per_level;
}
} }
void WriteSeq(ThreadState* thread) { void WriteSeq(ThreadState* thread) {
@ -1321,6 +1339,9 @@ int main(int argc, char** argv) {
else { else {
fprintf(stdout, "Cannot parse %s\n", argv[i]); fprintf(stdout, "Cannot parse %s\n", argv[i]);
} }
} else if (sscanf(argv[i], "--min_level_to_compress=%d%c", &n, &junk) == 1
&& n >= 0) {
FLAGS_min_level_to_compress = n;
} else if (sscanf(argv[i], "--disable_seek_compaction=%d%c", &n, &junk) == 1 } else if (sscanf(argv[i], "--disable_seek_compaction=%d%c", &n, &junk) == 1
&& (n == 0 || n == 1)) { && (n == 0 || n == 1)) {
FLAGS_disable_seek_compaction = n; FLAGS_disable_seek_compaction = n;

@ -156,6 +156,12 @@ Options SanitizeOptions(const std::string& dbname,
if (result.block_cache == NULL) { if (result.block_cache == NULL) {
result.block_cache = NewLRUCache(8 << 20); result.block_cache = NewLRUCache(8 << 20);
} }
if (src.compression_per_level != NULL) {
result.compression_per_level = new CompressionType[src.num_levels];
for (unsigned int i = 0; i < src.num_levels; i++) {
result.compression_per_level[i] = src.compression_per_level[i];
}
}
return result; return result;
} }
@ -246,6 +252,9 @@ DBImpl::~DBImpl() {
if (owns_cache_) { if (owns_cache_) {
delete options_.block_cache; delete options_.block_cache;
} }
if (options_.compression_per_level != NULL) {
delete options_.compression_per_level;
}
delete logger_; delete logger_;
} }
@ -961,7 +970,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
std::string fname = TableFileName(dbname_, file_number); std::string fname = TableFileName(dbname_, file_number);
Status s = env_->NewWritableFile(fname, &compact->outfile); Status s = env_->NewWritableFile(fname, &compact->outfile);
if (s.ok()) { if (s.ok()) {
compact->builder = new TableBuilder(options_, compact->outfile); compact->builder = new TableBuilder(options_, compact->outfile,
compact->compaction->level() + 1);
} }
return s; return s;
} }

@ -20,6 +20,24 @@
namespace leveldb { namespace leveldb {
static bool SnappyCompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Snappy_Compress(in.data(), in.size(), &out);
}
static bool ZlibCompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Zlib_Compress(in.data(), in.size(), &out);
}
static bool BZip2CompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::BZip2_Compress(in.data(), in.size(), &out);
}
static std::string RandomString(Random* rnd, int len) { static std::string RandomString(Random* rnd, int len) {
std::string r; std::string r;
test::RandomString(rnd, len, &r); test::RandomString(rnd, len, &r);
@ -1058,6 +1076,80 @@ TEST(DBTest, CompactionTrigger) {
ASSERT_EQ(NumTableFilesAtLevel(1), 1); ASSERT_EQ(NumTableFilesAtLevel(1), 1);
} }
void MinLevelHelper(DBTest* self, Options& options) {
Random rnd(301);
for (int num = 0;
num < options.level0_file_num_compaction_trigger - 1;
num++)
{
std::vector<std::string> values;
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
values.push_back(RandomString(&rnd, 10000));
ASSERT_OK(self->Put(Key(i), values[i]));
}
self->dbfull()->TEST_WaitForCompactMemTable();
ASSERT_EQ(self->NumTableFilesAtLevel(0), num + 1);
}
//generate one more file in level-0, and should trigger level-0 compaction
std::vector<std::string> values;
for (int i = 0; i < 12; i++) {
values.push_back(RandomString(&rnd, 10000));
ASSERT_OK(self->Put(Key(i), values[i]));
}
self->dbfull()->TEST_WaitForCompact();
ASSERT_EQ(self->NumTableFilesAtLevel(0), 0);
ASSERT_EQ(self->NumTableFilesAtLevel(1), 1);
}
TEST(DBTest, MinLevelToCompress) {
Options options = CurrentOptions();
options.write_buffer_size = 100<<10; //100KB
options.num_levels = 3;
options.max_mem_compaction_level = 0;
options.level0_file_num_compaction_trigger = 3;
options.create_if_missing = true;
CompressionType type;
if (SnappyCompressionSupported()) {
type = kSnappyCompression;
fprintf(stderr, "using snappy\n");
} else if (ZlibCompressionSupported()) {
type = kZlibCompression;
fprintf(stderr, "using zlib\n");
} else if (BZip2CompressionSupported()) {
type = kBZip2Compression;
fprintf(stderr, "using bzip2\n");
} else {
fprintf(stderr, "skipping test, compression disabled\n");
return;
}
options.compression_per_level = new CompressionType[options.num_levels];
// do not compress L0
for (int i = 0; i < 1; i++) {
options.compression_per_level[i] = kNoCompression;
}
for (int i = 1; i < options.num_levels; i++) {
options.compression_per_level[i] = type;
}
Reopen(&options);
MinLevelHelper(this, options);
// do not compress L0 and L1
for (int i = 0; i < 2; i++) {
options.compression_per_level[i] = kNoCompression;
}
for (int i = 2; i < options.num_levels; i++) {
options.compression_per_level[i] = type;
}
DestroyAndReopen(&options);
MinLevelHelper(this, options);
}
TEST(DBTest, RepeatedWritesToSameKey) { TEST(DBTest, RepeatedWritesToSameKey) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.env = env_; options.env = env_;

@ -134,6 +134,20 @@ struct Options {
// efficiently detect that and will switch to uncompressed mode. // efficiently detect that and will switch to uncompressed mode.
CompressionType compression; CompressionType compression;
// Different levels can have different compression policies. There
// are cases where most lower levels would like to quick compression
// algorithm while the higher levels (which have more data) use
// compression algorithms that have better compression but could
// be slower. This array, if non NULL, should have an entry for
// each level of the database. This array, if non NULL, overides the
// value specified in the previous field 'compression'. The caller is
// reponsible for allocating memory and initializing the values in it
// before invoking Open(). The caller is responsible for freeing this
// array and it could be freed anytime after the return from Open().
// This could have been a std::vector but that makes the equivalent
// java/C api hard to construct.
CompressionType* compression_per_level;
// If non-NULL, use the specified filter policy to reduce disk reads. // If non-NULL, use the specified filter policy to reduce disk reads.
// Many applications will benefit from passing the result of // Many applications will benefit from passing the result of
// NewBloomFilterPolicy() here. // NewBloomFilterPolicy() here.

@ -27,8 +27,10 @@ class TableBuilder {
public: public:
// Create a builder that will store the contents of the table it is // Create a builder that will store the contents of the table it is
// building in *file. Does not close the file. It is up to the // building in *file. Does not close the file. It is up to the
// caller to close the file after calling Finish(). // caller to close the file after calling Finish(). The output file
TableBuilder(const Options& options, WritableFile* file); // will be part of level specified by 'level'. A value of -1 means
// that the caller does not know which level the output file will reside.
TableBuilder(const Options& options, WritableFile* file, int level=-1);
// REQUIRES: Either Finish() or Abandon() has been called. // REQUIRES: Either Finish() or Abandon() has been called.
~TableBuilder(); ~TableBuilder();
@ -81,6 +83,7 @@ class TableBuilder {
struct Rep; struct Rep;
Rep* rep_; Rep* rep_;
int level_;
// No copying allowed // No copying allowed
TableBuilder(const TableBuilder&); TableBuilder(const TableBuilder&);

@ -60,8 +60,9 @@ struct TableBuilder::Rep {
} }
}; };
TableBuilder::TableBuilder(const Options& options, WritableFile* file) TableBuilder::TableBuilder(const Options& options, WritableFile* file,
: rep_(new Rep(options, file)) { int level)
: rep_(new Rep(options, file)), level_(level) {
if (rep_->filter_block != NULL) { if (rep_->filter_block != NULL) {
rep_->filter_block->StartBlock(0); rep_->filter_block->StartBlock(0);
} }
@ -152,7 +153,21 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
Slice block_contents; Slice block_contents;
std::string* compressed = &r->compressed_output; std::string* compressed = &r->compressed_output;
CompressionType type = r->options.compression; CompressionType type;
// If the use has specified a different compression level for each level,
// then pick the compresison for that level.
if (r->options.compression_per_level != NULL) {
if (level_ == -1) {
// this is mostly for backward compatibility. The builder does not
// know which level this file belongs to. Apply the compression level
// specified for level 0 to all levels.
type = r->options.compression_per_level[0];
} else {
type = r->options.compression_per_level[level_];
}
} else {
type = r->options.compression;
}
switch (type) { switch (type) {
case kNoCompression: case kNoCompression:
block_contents = raw; block_contents = raw;

@ -24,6 +24,7 @@ Options::Options()
block_size(4096), block_size(4096),
block_restart_interval(16), block_restart_interval(16),
compression(kSnappyCompression), compression(kSnappyCompression),
compression_per_level(NULL),
filter_policy(NULL), filter_policy(NULL),
num_levels(7), num_levels(7),
level0_file_num_compaction_trigger(4), level0_file_num_compaction_trigger(4),
@ -63,7 +64,14 @@ Options::Dump(
Log(log," Options.block_cache_size: %zd", block_cache->GetCapacity()); Log(log," Options.block_cache_size: %zd", block_cache->GetCapacity());
Log(log," Options.block_size: %zd", block_size); Log(log," Options.block_size: %zd", block_size);
Log(log,"Options.block_restart_interval: %d", block_restart_interval); Log(log,"Options.block_restart_interval: %d", block_restart_interval);
if (compression_per_level != NULL) {
for (unsigned int i = 0; i < num_levels; i++){
Log(log," Options.compression[%d]: %d",
i, compression_per_level[i]);
}
} else {
Log(log," Options.compression: %d", compression); Log(log," Options.compression: %d", compression);
}
Log(log," Options.filter_policy: %s", Log(log," Options.filter_policy: %s",
filter_policy == NULL ? "NULL" : filter_policy->Name()); filter_policy == NULL ? "NULL" : filter_policy->Name());
Log(log," Options.num_levels: %d", num_levels); Log(log," Options.num_levels: %d", num_levels);

Loading…
Cancel
Save