From 925f60d39dbd065429e0383204220ac51b5c3884 Mon Sep 17 00:00:00 2001 From: heyongqiang Date: Mon, 29 Oct 2012 15:25:01 -0700 Subject: [PATCH 1/6] add a test case to make sure chaning num_levels will fail Summary: Summary: as subject Test Plan: db_test Reviewers: dhruba, MarkCallaghan Reviewed By: MarkCallaghan Differential Revision: https://reviews.facebook.net/D6303 --- db/db_test.cc | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/db/db_test.cc b/db/db_test.cc index e5a8f8711..6c8bba868 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1682,6 +1682,28 @@ TEST(DBTest, DBOpen_Options) { db = NULL; } +TEST(DBTest, DBOpen_Change_NumLevels) { + std::string dbname = test::TmpDir() + "/db_change_num_levels"; + DestroyDB(dbname, Options()); + Options opts; + Status s; + opts.create_if_missing = true; + s = DB::Open(opts, dbname, &db_); + ASSERT_OK(s); + ASSERT_TRUE(db_ != NULL); + db_->Put(WriteOptions(), "a", "123"); + db_->Put(WriteOptions(), "b", "234"); + db_->CompactRange(NULL, NULL); + delete db_; + db_ = NULL; + + opts.create_if_missing = false; + opts.num_levels = 2; + s = DB::Open(opts, dbname, &db_); + ASSERT_TRUE(strstr(s.ToString().c_str(), "Corruption") != NULL); + ASSERT_TRUE(db_ == NULL); +} + // Check that number of files does not grow when we are out of space TEST(DBTest, NoSpace) { Options options = CurrentOptions(); From fb8d4373257ae4c62b0e9ead80ccd04afccf9322 Mon Sep 17 00:00:00 2001 From: heyongqiang Date: Mon, 29 Oct 2012 16:26:20 -0700 Subject: [PATCH 2/6] fix test failure Summary: as subject Test Plan: db_test Reviewers: dhruba, MarkCallaghan Reviewed By: MarkCallaghan Differential Revision: https://reviews.facebook.net/D6309 --- db/db_test.cc | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index 6c8bba868..e3d61ab74 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1687,21 +1687,22 @@ TEST(DBTest, DBOpen_Change_NumLevels) { DestroyDB(dbname, Options()); Options opts; Status s; + DB* db = NULL; opts.create_if_missing = true; - s = DB::Open(opts, dbname, &db_); + s = DB::Open(opts, dbname, &db); ASSERT_OK(s); - ASSERT_TRUE(db_ != NULL); - db_->Put(WriteOptions(), "a", "123"); - db_->Put(WriteOptions(), "b", "234"); - db_->CompactRange(NULL, NULL); - delete db_; - db_ = NULL; + ASSERT_TRUE(db != NULL); + db->Put(WriteOptions(), "a", "123"); + db->Put(WriteOptions(), "b", "234"); + db->CompactRange(NULL, NULL); + delete db; + db = NULL; opts.create_if_missing = false; opts.num_levels = 2; - s = DB::Open(opts, dbname, &db_); + s = DB::Open(opts, dbname, &db); ASSERT_TRUE(strstr(s.ToString().c_str(), "Corruption") != NULL); - ASSERT_TRUE(db_ == NULL); + ASSERT_TRUE(db == NULL); } // Check that number of files does not grow when we are out of space From 3e7e269292e60c51331839ac00cea409a0439d1d Mon Sep 17 00:00:00 2001 From: Mark Callaghan Date: Mon, 29 Oct 2012 12:04:27 -0700 Subject: [PATCH 3/6] Use timer to measure sleep rather than assume it is 1000 usecs Summary: This makes the stall timers in MakeRoomForWrite more accurate by timing the sleeps. From looking at the logs the real sleep times are usually about 2000 usecs each when SleepForMicros(1000) is called. The modified LOG messages are: 2012/10/29-12:06:33.271984 2b3cc872f700 delaying write 13 usecs for level0_slowdown_writes_trigger 2012/10/29-12:06:34.688939 2b3cc872f700 delaying write 1728 usecs for rate limits with max score 3.83 Task ID: # Blame Rev: Test Plan: run db_bench, look at DB/LOG Revert Plan: Database Impact: Memcache Impact: Other Notes: EImportant: - begin *PUBLIC* platform impact section - Bugzilla: # - end platform impact - Reviewers: dhruba Reviewed By: dhruba Differential Revision: https://reviews.facebook.net/D6297 --- db/db_impl.cc | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 211010990..1d61099ad 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1517,10 +1517,14 @@ Status DBImpl::MakeRoomForWrite(bool force) { // this delay hands over some CPU to the compaction thread in // case it is sharing the same core as the writer. mutex_.Unlock(); + uint64_t t1 = env_->NowMicros(); env_->SleepForMicroseconds(1000); - stall_level0_slowdown_ += 1000; + uint64_t delayed = env_->NowMicros() - t1; + stall_level0_slowdown_ += delayed; allow_delay = false; // Do not delay a single write more than once - Log(options_.info_log, "delaying write...\n"); + Log(options_.info_log, + "delaying write %llu usecs for level0_slowdown_writes_trigger\n", + delayed); mutex_.Lock(); } else if (!force && (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { @@ -1546,11 +1550,14 @@ Status DBImpl::MakeRoomForWrite(bool force) { (score = versions_->MaxCompactionScore()) > options_.rate_limit) { // Delay a write when the compaction score for any level is too large. mutex_.Unlock(); + uint64_t t1 = env_->NowMicros(); env_->SleepForMicroseconds(1000); - stall_leveln_slowdown_ += 1000; + uint64_t delayed = env_->NowMicros() - t1; + stall_leveln_slowdown_ += delayed; allow_delay = false; // Do not delay a single write more than once Log(options_.info_log, - "delaying write for rate limits with max score %.2f\n", score); + "delaying write %llu usecs for rate limits with max score %.2f\n", + delayed, score); mutex_.Lock(); } else { // Attempt to switch to a new memtable and trigger compaction of old From 3096fa75342f39b642eb1b212504af1bd205cb72 Mon Sep 17 00:00:00 2001 From: heyongqiang Date: Wed, 31 Oct 2012 17:02:24 -0700 Subject: [PATCH 4/6] Add two more options: disable block cache and make table cache shard number configuable Summary: as subject Test Plan: run db_bench and db_test Reviewers: dhruba Reviewed By: dhruba Differential Revision: https://reviews.facebook.net/D6111 --- db/db_bench.cc | 13 +++++++++++++ db/db_impl.cc | 6 +++++- db/table_cache.cc | 2 +- include/leveldb/options.h | 8 ++++++++ util/options.cc | 11 ++++++++++- 5 files changed, 37 insertions(+), 3 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 22397a2fc..86547bbbc 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -183,6 +183,8 @@ static enum leveldb::CompressionType FLAGS_compression_type = // other levels are compressed static int FLAGS_min_level_to_compress = -1; +static int FLAGS_table_cache_numshardbits = 4; + // posix or hdfs environment static leveldb::Env* FLAGS_env = leveldb::Env::Default(); @@ -897,6 +899,9 @@ class Benchmark { Options options; options.create_if_missing = !FLAGS_use_existing_db; options.block_cache = cache_; + if (cache_ == NULL) { + options.no_block_cache = true; + } options.write_buffer_size = FLAGS_write_buffer_size; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; @@ -932,6 +937,7 @@ class Benchmark { options.delete_obsolete_files_period_micros = FLAGS_delete_obsolete_files_period_micros; options.rate_limit = FLAGS_rate_limit; + options.table_cache_numshardbits = FLAGS_table_cache_numshardbits; Status s = DB::Open(options, FLAGS_db, &db_); if (!s.ok()) { fprintf(stderr, "open error: %s\n", s.ToString().c_str()); @@ -1256,6 +1262,13 @@ int main(int argc, char** argv) { fprintf(stderr, "The cache cannot be sharded into 2**%d pieces\n", n); exit(1); } + } else if (sscanf(argv[i], "--table_cache_numshardbits=%d%c", + &n, &junk) == 1) { + if (n <= 0 || n > 20) { + fprintf(stderr, "The cache cannot be sharded into 2**%d pieces\n", n); + exit(1); + } + FLAGS_table_cache_numshardbits = n; } else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) { FLAGS_bloom_bits = n; } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 1d61099ad..62f8592ca 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -153,7 +153,7 @@ Options SanitizeOptions(const std::string& dbname, result.info_log = NULL; } } - if (result.block_cache == NULL) { + if (result.block_cache == NULL && !result.no_block_cache) { result.block_cache = NewLRUCache(8 << 20); } if (src.compression_per_level != NULL) { @@ -1735,6 +1735,10 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { *dbptr = NULL; + if (options.block_cache != NULL && options.no_block_cache) { + return Status::InvalidArgument( + "no_block_cache is true while block_cache is not NULL"); + } DBImpl* impl = new DBImpl(options, dbname); impl->mutex_.Lock(); VersionEdit edit(impl->NumberLevels()); diff --git a/db/table_cache.cc b/db/table_cache.cc index d47613248..592a6420b 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -39,7 +39,7 @@ TableCache::TableCache(const std::string& dbname, : env_(options->env), dbname_(dbname), options_(options), - cache_(NewLRUCache(entries)) { + cache_(NewLRUCache(entries, options->table_cache_numshardbits)) { dbstatistics = (DBStatistics*)options->statistics; } diff --git a/include/leveldb/options.h b/include/leveldb/options.h index c75e725f5..cc50ba1f8 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -271,6 +271,14 @@ struct Options { // exceeds rate_limit. This is ignored when <= 1.0. double rate_limit; + // Disable block cache. If this is set to false, + // then no block cache should be used, and the block_cache should + // point to a NULL object. + bool no_block_cache; + + // Number of shards used for table cache. + int table_cache_numshardbits; + // Create an Options object with default values for all fields. Options(); diff --git a/util/options.cc b/util/options.cc index 40933c54d..4a2d53456 100644 --- a/util/options.cc +++ b/util/options.cc @@ -43,6 +43,8 @@ Options::Options() db_stats_log_interval(1800), db_log_dir(""), disable_seek_compaction(false), + no_block_cache(false), + table_cache_numshardbits(4), max_log_file_size(0), delete_obsolete_files_period_micros(0), rate_limit(0.0) { @@ -61,7 +63,10 @@ Options::Dump( Log(log," Options.write_buffer_size: %zd", write_buffer_size); Log(log," Options.max_open_files: %d", max_open_files); Log(log," Options.block_cache: %p", block_cache); - Log(log," Options.block_cache_size: %zd", block_cache->GetCapacity()); + if (block_cache) { + Log(log," Options.block_cache_size: %zd", + block_cache->GetCapacity()); + } Log(log," Options.block_size: %zd", block_size); Log(log,"Options.block_restart_interval: %d", block_restart_interval); if (compression_per_level != NULL) { @@ -104,6 +109,10 @@ Options::Dump( db_log_dir.c_str()); Log(log," Options.disable_seek_compaction: %d", disable_seek_compaction); + Log(log," Options.no_block_cache: %d", + no_block_cache); + Log(log," Options.table_cache_numshardbits: %d", + table_cache_numshardbits); Log(log," Options.delete_obsolete_files_period_micros: %ld", delete_obsolete_files_period_micros); Log(log," Options.rate_limit: %.2f", From 854c66b089bef5d27f79750884f70f6e2c8c69da Mon Sep 17 00:00:00 2001 From: amayank Date: Thu, 1 Nov 2012 10:50:08 -0700 Subject: [PATCH 5/6] Make compression options configurable. These include window-bits, level and strategy for ZlibCompression Summary: Leveldb currently uses windowBits=-14 while using zlib compression.(It was earlier 15). This makes the setting configurable. Related changes here: https://reviews.facebook.net/D6105 Test Plan: make all check Reviewers: dhruba, MarkCallaghan, sheki, heyongqiang Differential Revision: https://reviews.facebook.net/D6393 --- db/c.cc | 7 ++ db/c_test.c | 1 + db/db_bench.cc | 25 +++-- db/db_test.cc | 186 ++++++++++++++++++++++---------------- include/leveldb/options.h | 18 +++- port/port_posix.h | 18 ++-- table/table_builder.cc | 9 +- table/table_test.cc | 10 +- util/options.cc | 8 +- 9 files changed, 175 insertions(+), 107 deletions(-) diff --git a/db/c.cc b/db/c.cc index c775fc919..db88c7a83 100644 --- a/db/c.cc +++ b/db/c.cc @@ -502,6 +502,13 @@ void leveldb_options_set_compression(leveldb_options_t* opt, int t) { opt->rep.compression = static_cast(t); } +void leveldb_options_set_compression_options( + leveldb_options_t* opt, int w_bits, int level, int strategy) { + opt->rep.compression_opts.window_bits = w_bits; + opt->rep.compression_opts.level = level; + opt->rep.compression_opts.strategy = strategy; +} + void leveldb_options_set_disable_data_sync( leveldb_options_t* opt, bool disable_data_sync) { opt->rep.disableDataSync = disable_data_sync; diff --git a/db/c_test.c b/db/c_test.c index 979244715..a1e82b2f1 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -187,6 +187,7 @@ int main(int argc, char** argv) { leveldb_options_set_block_size(options, 1024); leveldb_options_set_block_restart_interval(options, 8); leveldb_options_set_compression(options, leveldb_no_compression); + leveldb_options_set_compression_options(options, -14, -1, 0); roptions = leveldb_readoptions_create(); leveldb_readoptions_set_verify_checksums(roptions, 1); diff --git a/db/db_bench.cc b/db/db_bench.cc index 86547bbbc..8e614d71e 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -179,8 +179,8 @@ static uint64_t FLAGS_delete_obsolete_files_period_micros = 0; static enum leveldb::CompressionType FLAGS_compression_type = leveldb::kSnappyCompression; -// Allows compression for levels 0 and 1 to be disabled when -// other levels are compressed +// Allows compression for levels 0 and 1 to be disabled when +// other levels are compressed static int FLAGS_min_level_to_compress = -1; static int FLAGS_table_cache_numshardbits = 4; @@ -509,15 +509,18 @@ class Benchmark { switch (FLAGS_compression_type) { case kSnappyCompression: - result = port::Snappy_Compress(text, strlen(text), &compressed); + result = port::Snappy_Compress(Options().compression_opts, text, + strlen(text), &compressed); name = "Snappy"; break; case kZlibCompression: - result = port::Zlib_Compress(text, strlen(text), &compressed); + result = port::Zlib_Compress(Options().compression_opts, text, + strlen(text), &compressed); name = "Zlib"; break; case kBZip2Compression: - result = port::BZip2_Compress(text, strlen(text), &compressed); + result = port::BZip2_Compress(Options().compression_opts, text, + strlen(text), &compressed); name = "BZip2"; break; } @@ -855,7 +858,8 @@ class Benchmark { bool ok = true; std::string compressed; while (ok && bytes < 1024 * 1048576) { // Compress 1G - ok = port::Snappy_Compress(input.data(), input.size(), &compressed); + ok = port::Snappy_Compress(Options().compression_opts, input.data(), + input.size(), &compressed); produced += compressed.size(); bytes += input.size(); thread->stats.FinishedSingleOp(NULL); @@ -876,7 +880,8 @@ class Benchmark { RandomGenerator gen; Slice input = gen.Generate(Options().block_size); std::string compressed; - bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed); + bool ok = port::Snappy_Compress(Options().compression_opts, input.data(), + input.size(), &compressed); int64_t bytes = 0; char* uncompressed = new char[input.size()]; while (ok && bytes < 1024 * 1048576) { // Compress 1G @@ -928,7 +933,7 @@ class Benchmark { for (unsigned int i = 0; i < FLAGS_min_level_to_compress; i++) { options.compression_per_level[i] = kNoCompression; } - for (unsigned int i = FLAGS_min_level_to_compress; + for (unsigned int i = FLAGS_min_level_to_compress; i < FLAGS_num_levels; i++) { options.compression_per_level[i] = FLAGS_compression_type; } @@ -1352,8 +1357,8 @@ int main(int argc, char** argv) { else { fprintf(stdout, "Cannot parse %s\n", argv[i]); } - } else if (sscanf(argv[i], "--min_level_to_compress=%d%c", &n, &junk) == 1 - && n >= 0) { + } else if (sscanf(argv[i], "--min_level_to_compress=%d%c", &n, &junk) == 1 + && n >= 0) { FLAGS_min_level_to_compress = n; } else if (sscanf(argv[i], "--disable_seek_compaction=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { diff --git a/db/db_test.cc b/db/db_test.cc index e3d61ab74..d81a88369 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -20,23 +20,23 @@ namespace leveldb { -static bool SnappyCompressionSupported() { - std::string out; - Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::Snappy_Compress(in.data(), in.size(), &out); -} - -static bool ZlibCompressionSupported() { - std::string out; - Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::Zlib_Compress(in.data(), in.size(), &out); -} - -static bool BZip2CompressionSupported() { - std::string out; - Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::BZip2_Compress(in.data(), in.size(), &out); -} +static bool SnappyCompressionSupported(const CompressionOptions& options) { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::Snappy_Compress(options, in.data(), in.size(), &out); +} + +static bool ZlibCompressionSupported(const CompressionOptions& options) { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::Zlib_Compress(options, in.data(), in.size(), &out); +} + +static bool BZip2CompressionSupported(const CompressionOptions& options) { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::BZip2_Compress(options, in.data(), in.size(), &out); +} static std::string RandomString(Random* rnd, int len) { std::string r; @@ -1076,57 +1076,60 @@ TEST(DBTest, CompactionTrigger) { ASSERT_EQ(NumTableFilesAtLevel(1), 1); } -void MinLevelHelper(DBTest* self, Options& options) { - Random rnd(301); - - for (int num = 0; - num < options.level0_file_num_compaction_trigger - 1; - num++) - { - std::vector values; - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { - values.push_back(RandomString(&rnd, 10000)); - ASSERT_OK(self->Put(Key(i), values[i])); - } - self->dbfull()->TEST_WaitForCompactMemTable(); - ASSERT_EQ(self->NumTableFilesAtLevel(0), num + 1); - } - - //generate one more file in level-0, and should trigger level-0 compaction - std::vector values; - for (int i = 0; i < 12; i++) { - values.push_back(RandomString(&rnd, 10000)); - ASSERT_OK(self->Put(Key(i), values[i])); - } - self->dbfull()->TEST_WaitForCompact(); - - ASSERT_EQ(self->NumTableFilesAtLevel(0), 0); - ASSERT_EQ(self->NumTableFilesAtLevel(1), 1); -} - -TEST(DBTest, MinLevelToCompress) { - Options options = CurrentOptions(); - options.write_buffer_size = 100<<10; //100KB - options.num_levels = 3; - options.max_mem_compaction_level = 0; - options.level0_file_num_compaction_trigger = 3; +void MinLevelHelper(DBTest* self, Options& options) { + Random rnd(301); + + for (int num = 0; + num < options.level0_file_num_compaction_trigger - 1; + num++) + { + std::vector values; + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + values.push_back(RandomString(&rnd, 10000)); + ASSERT_OK(self->Put(Key(i), values[i])); + } + self->dbfull()->TEST_WaitForCompactMemTable(); + ASSERT_EQ(self->NumTableFilesAtLevel(0), num + 1); + } + + //generate one more file in level-0, and should trigger level-0 compaction + std::vector values; + for (int i = 0; i < 12; i++) { + values.push_back(RandomString(&rnd, 10000)); + ASSERT_OK(self->Put(Key(i), values[i])); + } + self->dbfull()->TEST_WaitForCompact(); + + ASSERT_EQ(self->NumTableFilesAtLevel(0), 0); + ASSERT_EQ(self->NumTableFilesAtLevel(1), 1); +} + +void MinLevelToCompress(CompressionType& type, Options& options, int wbits, + int lev, int strategy) { + fprintf(stderr, "Test with compression options : window_bits = %d, level = %d + , strategy = %d}\n", wbits, lev, strategy); + options.write_buffer_size = 100<<10; //100KB + options.num_levels = 3; + options.max_mem_compaction_level = 0; + options.level0_file_num_compaction_trigger = 3; options.create_if_missing = true; - CompressionType type; - - if (SnappyCompressionSupported()) { - type = kSnappyCompression; - fprintf(stderr, "using snappy\n"); - } else if (ZlibCompressionSupported()) { - type = kZlibCompression; - fprintf(stderr, "using zlib\n"); - } else if (BZip2CompressionSupported()) { - type = kBZip2Compression; - fprintf(stderr, "using bzip2\n"); - } else { - fprintf(stderr, "skipping test, compression disabled\n"); - return; - } + + if (SnappyCompressionSupported(CompressionOptions(wbits, lev, strategy))) { + type = kSnappyCompression; + fprintf(stderr, "using snappy\n"); + } else if (ZlibCompressionSupported( + CompressionOptions(wbits, lev, strategy))) { + type = kZlibCompression; + fprintf(stderr, "using zlib\n"); + } else if (BZip2CompressionSupported( + CompressionOptions(wbits, lev, strategy))) { + type = kBZip2Compression; + fprintf(stderr, "using bzip2\n"); + } else { + fprintf(stderr, "skipping test, compression disabled\n"); + return; + } options.compression_per_level = new CompressionType[options.num_levels]; // do not compress L0 @@ -1136,9 +1139,32 @@ TEST(DBTest, MinLevelToCompress) { for (int i = 1; i < options.num_levels; i++) { options.compression_per_level[i] = type; } - Reopen(&options); - MinLevelHelper(this, options); - +} +TEST(DBTest, MinLevelToCompress1) { + Options options = CurrentOptions(); + CompressionType type; + MinLevelToCompress(type, options, -14, -1, 0); + Reopen(&options); + MinLevelHelper(this, options); + + // do not compress L0 and L1 + for (int i = 0; i < 2; i++) { + options.compression_per_level[i] = kNoCompression; + } + for (int i = 2; i < options.num_levels; i++) { + options.compression_per_level[i] = type; + } + DestroyAndReopen(&options); + MinLevelHelper(this, options); +} + +TEST(DBTest, MinLevelToCompress2) { + Options options = CurrentOptions(); + CompressionType type; + MinLevelToCompress(type, options, 15, -1, 0); + Reopen(&options); + MinLevelHelper(this, options); + // do not compress L0 and L1 for (int i = 0; i < 2; i++) { options.compression_per_level[i] = kNoCompression; @@ -1146,9 +1172,9 @@ TEST(DBTest, MinLevelToCompress) { for (int i = 2; i < options.num_levels; i++) { options.compression_per_level[i] = type; } - DestroyAndReopen(&options); - MinLevelHelper(this, options); -} + DestroyAndReopen(&options); + MinLevelHelper(this, options); +} TEST(DBTest, RepeatedWritesToSameKey) { Options options = CurrentOptions(); @@ -1851,7 +1877,7 @@ TEST(DBTest, SnapshotFiles) { uint64_t size; ASSERT_OK(env_->GetFileSize(src, &size)); - // record the number and the size of the + // record the number and the size of the // latest manifest file if (ParseFileName(files[i].substr(1), &number, &type)) { if (type == kDescriptorFile) { @@ -1866,7 +1892,7 @@ TEST(DBTest, SnapshotFiles) { ASSERT_OK(env_->NewSequentialFile(src, &srcfile)); WritableFile* destfile; ASSERT_OK(env_->NewWritableFile(dest, &destfile)); - + char buffer[4096]; Slice slice; while (size > 0) { @@ -1889,7 +1915,7 @@ TEST(DBTest, SnapshotFiles) { extras.push_back(RandomString(&rnd, 100000)); ASSERT_OK(Put(Key(i), extras[i])); } - + // verify that data in the snapshot are correct Options opts; DB* snapdb; @@ -1905,7 +1931,7 @@ TEST(DBTest, SnapshotFiles) { } delete snapdb; - // look at the new live files after we added an 'extra' key + // look at the new live files after we added an 'extra' key // and after we took the first snapshot. uint64_t new_manifest_number = 0; uint64_t new_manifest_size = 0; @@ -1919,7 +1945,7 @@ TEST(DBTest, SnapshotFiles) { // previous shapshot. for (unsigned int i = 0; i < newfiles.size(); i++) { std::string src = dbname_ + "/" + newfiles[i]; - // record the lognumber and the size of the + // record the lognumber and the size of the // latest manifest file if (ParseFileName(newfiles[i].substr(1), &number, &type)) { if (type == kDescriptorFile) { @@ -1934,7 +1960,7 @@ TEST(DBTest, SnapshotFiles) { } ASSERT_EQ(manifest_number, new_manifest_number); ASSERT_GT(new_manifest_size, manifest_size); - + // release file snapshot dbfull()->DisableFileDeletions(); } @@ -1998,7 +2024,7 @@ TEST(DBTest, ReadCompaction) { // in some level, indicating that there was a compaction ASSERT_TRUE(NumTableFilesAtLevel(0) < l1 || NumTableFilesAtLevel(1) < l2 || - NumTableFilesAtLevel(2) < l3); + NumTableFilesAtLevel(2) < l3); delete options.block_cache; } } diff --git a/include/leveldb/options.h b/include/leveldb/options.h index cc50ba1f8..530d8d6d3 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -32,6 +32,19 @@ enum CompressionType { kBZip2Compression = 0x3 }; +// Compression options for different compression algorithms like Zlib +struct CompressionOptions { + int window_bits; + int level; + int strategy; + CompressionOptions():window_bits(-14), + level(-1), + strategy(0){} + CompressionOptions(int wbits, int lev, int strategy):window_bits(wbits), + level(lev), + strategy(strategy){} +}; + // Options to control the behavior of a database (passed to DB::Open) struct Options { // ------------------- @@ -144,10 +157,13 @@ struct Options { // reponsible for allocating memory and initializing the values in it // before invoking Open(). The caller is responsible for freeing this // array and it could be freed anytime after the return from Open(). - // This could have been a std::vector but that makes the equivalent + // This could have been a std::vector but that makes the equivalent // java/C api hard to construct. CompressionType* compression_per_level; + //different options for compression algorithms + CompressionOptions compression_opts; + // If non-NULL, use the specified filter policy to reduce disk reads. // Many applications will benefit from passing the result of // NewBloomFilterPolicy() here. diff --git a/port/port_posix.h b/port/port_posix.h index db4e0b8ca..e2540a426 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -44,6 +44,7 @@ #include #include #include +#include "leveldb/options.h" #include "port/atomic_pointer.h" #ifndef PLATFORM_IS_LITTLE_ENDIAN @@ -131,8 +132,8 @@ typedef pthread_once_t OnceType; #define LEVELDB_ONCE_INIT PTHREAD_ONCE_INIT extern void InitOnce(OnceType* once, void (*initializer)()); -inline bool Snappy_Compress(const char* input, size_t length, - ::std::string* output) { +inline bool Snappy_Compress(const CompressionOptions& opts, const char* input, + size_t length, ::std::string* output) { #ifdef SNAPPY output->resize(snappy::MaxCompressedLength(length)); size_t outlen; @@ -162,9 +163,8 @@ inline bool Snappy_Uncompress(const char* input, size_t length, #endif } -inline bool Zlib_Compress(const char* input, size_t length, - ::std::string* output, int windowBits = -14, int level = -1, - int strategy = 0) { +inline bool Zlib_Compress(const CompressionOptions& opts, const char* input, + size_t length, ::std::string* output) { #ifdef ZLIB // The memLevel parameter specifies how much memory should be allocated for // the internal compression state. @@ -174,8 +174,8 @@ inline bool Zlib_Compress(const char* input, size_t length, static const int memLevel = 8; z_stream _stream; memset(&_stream, 0, sizeof(z_stream)); - int st = deflateInit2(&_stream, level, Z_DEFLATED, windowBits, - memLevel, strategy); + int st = deflateInit2(&_stream, opts.level, Z_DEFLATED, opts.window_bits, + memLevel, opts.strategy); if (st != Z_OK) { return false; } @@ -284,8 +284,8 @@ inline char* Zlib_Uncompress(const char* input_data, size_t input_length, return NULL; } -inline bool BZip2_Compress(const char* input, size_t length, - ::std::string* output) { +inline bool BZip2_Compress(const CompressionOptions& opts, const char* input, + size_t length, ::std::string* output) { #ifdef BZIP2 bz_stream _stream; memset(&_stream, 0, sizeof(bz_stream)); diff --git a/table/table_builder.cc b/table/table_builder.cc index 5306b2976..d867a1ca9 100644 --- a/table/table_builder.cc +++ b/table/table_builder.cc @@ -175,7 +175,8 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { case kSnappyCompression: { std::string* compressed = &r->compressed_output; - if (port::Snappy_Compress(raw.data(), raw.size(), compressed) && + if (port::Snappy_Compress(r->options.compression_opts, raw.data(), + raw.size(), compressed) && GoodCompressionRatio(compressed->size(), raw.size())) { block_contents = *compressed; } else { @@ -187,7 +188,8 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { break; } case kZlibCompression: - if (port::Zlib_Compress(raw.data(), raw.size(), compressed) && + if (port::Zlib_Compress(r->options.compression_opts, raw.data(), + raw.size(), compressed) && GoodCompressionRatio(compressed->size(), raw.size())) { block_contents = *compressed; } else { @@ -198,7 +200,8 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { } break; case kBZip2Compression: - if (port::BZip2_Compress(raw.data(), raw.size(), compressed) && + if (port::BZip2_Compress(r->options.compression_opts, raw.data(), + raw.size(), compressed) && GoodCompressionRatio(compressed->size(), raw.size())) { block_contents = *compressed; } else { diff --git a/table/table_test.cc b/table/table_test.cc index dd0ba6d5f..c78c4689d 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -247,6 +247,7 @@ class TableConstructor: public Constructor { source_ = new StringSource(sink.contents()); Options table_options; table_options.comparator = options.comparator; + table_options.compression_opts = options.compression_opts; return Table::Open(table_options, source_, sink.contents().size(), &table_); } @@ -399,19 +400,22 @@ class DBConstructor: public Constructor { static bool SnappyCompressionSupported() { std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::Snappy_Compress(in.data(), in.size(), &out); + return port::Snappy_Compress(Options().compression_opts, in.data(), in.size(), + &out); } static bool ZlibCompressionSupported() { std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::Zlib_Compress(in.data(), in.size(), &out); + return port::Zlib_Compress(Options().compression_opts, in.data(), in.size(), + &out); } static bool BZip2CompressionSupported() { std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::BZip2_Compress(in.data(), in.size(), &out); + return port::BZip2_Compress(Options().compression_opts, in.data(), in.size(), + &out); } enum TestType { diff --git a/util/options.cc b/util/options.cc index 4a2d53456..765b72e5e 100644 --- a/util/options.cc +++ b/util/options.cc @@ -71,7 +71,7 @@ Options::Dump( Log(log,"Options.block_restart_interval: %d", block_restart_interval); if (compression_per_level != NULL) { for (unsigned int i = 0; i < num_levels; i++){ - Log(log," Options.compression[%d]: %d", + Log(log," Options.compression[%d]: %d", i, compression_per_level[i]); } } else { @@ -85,6 +85,12 @@ Options::Dump( Log(log," Options.max_log_file_size: %d", max_log_file_size); Log(log," Options.db_stats_log_interval: %d", db_stats_log_interval); + Log(log," Options.compression_opts.window_bits: %d", + compression_opts.window_bits); + Log(log," Options.compression_opts.level: %d", + compression_opts.level); + Log(log," Options.compression_opts.strategy: %d", + compression_opts.strategy); Log(log," Options.level0_file_num_compaction_trigger: %d", level0_file_num_compaction_trigger); Log(log," Options.level0_slowdown_writes_trigger: %d", From a1bd5b7752c91b17453742d0323b647d1c585f60 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Sun, 4 Nov 2012 22:04:14 -0800 Subject: [PATCH 6/6] Compilation problem introduced by previous commit 854c66b089bef5d27f79750884f70f6e2c8c69da. Summary: Compilation problem introduced by previous commit 854c66b089bef5d27f79750884f70f6e2c8c69da. Test Plan: make check --- db/db_test.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index d81a88369..491e88072 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1107,8 +1107,7 @@ void MinLevelHelper(DBTest* self, Options& options) { void MinLevelToCompress(CompressionType& type, Options& options, int wbits, int lev, int strategy) { - fprintf(stderr, "Test with compression options : window_bits = %d, level = %d - , strategy = %d}\n", wbits, lev, strategy); + fprintf(stderr, "Test with compression options : window_bits = %d, level = %d, strategy = %d}\n", wbits, lev, strategy); options.write_buffer_size = 100<<10; //100KB options.num_levels = 3; options.max_mem_compaction_level = 0;