Merge branch 'master' into performance

Conflicts:
	db/db_impl.cc
	util/options.cc
main
Dhruba Borthakur 12 years ago
commit 81f735d97c
  1. 7
      db/c.cc
  2. 1
      db/c_test.c
  3. 38
      db/db_bench.cc
  4. 20
      db/db_impl.cc
  5. 208
      db/db_test.cc
  6. 2
      db/table_cache.cc
  7. 26
      include/leveldb/options.h
  8. 18
      port/port_posix.h
  9. 9
      table/table_builder.cc
  10. 10
      table/table_test.cc
  11. 32
      util/options.cc

@ -502,6 +502,13 @@ void leveldb_options_set_compression(leveldb_options_t* opt, int t) {
opt->rep.compression = static_cast<CompressionType>(t); opt->rep.compression = static_cast<CompressionType>(t);
} }
void leveldb_options_set_compression_options(
leveldb_options_t* opt, int w_bits, int level, int strategy) {
opt->rep.compression_opts.window_bits = w_bits;
opt->rep.compression_opts.level = level;
opt->rep.compression_opts.strategy = strategy;
}
void leveldb_options_set_disable_data_sync( void leveldb_options_set_disable_data_sync(
leveldb_options_t* opt, bool disable_data_sync) { leveldb_options_t* opt, bool disable_data_sync) {
opt->rep.disableDataSync = disable_data_sync; opt->rep.disableDataSync = disable_data_sync;

@ -187,6 +187,7 @@ int main(int argc, char** argv) {
leveldb_options_set_block_size(options, 1024); leveldb_options_set_block_size(options, 1024);
leveldb_options_set_block_restart_interval(options, 8); leveldb_options_set_block_restart_interval(options, 8);
leveldb_options_set_compression(options, leveldb_no_compression); leveldb_options_set_compression(options, leveldb_no_compression);
leveldb_options_set_compression_options(options, -14, -1, 0);
roptions = leveldb_readoptions_create(); roptions = leveldb_readoptions_create();
leveldb_readoptions_set_verify_checksums(roptions, 1); leveldb_readoptions_set_verify_checksums(roptions, 1);

@ -189,10 +189,12 @@ static uint64_t FLAGS_delete_obsolete_files_period_micros = 0;
static enum leveldb::CompressionType FLAGS_compression_type = static enum leveldb::CompressionType FLAGS_compression_type =
leveldb::kSnappyCompression; leveldb::kSnappyCompression;
// Allows compression for levels 0 and 1 to be disabled when // Allows compression for levels 0 and 1 to be disabled when
// other levels are compressed // other levels are compressed
static int FLAGS_min_level_to_compress = -1; static int FLAGS_min_level_to_compress = -1;
static int FLAGS_table_cache_numshardbits = 4;
// posix or hdfs environment // posix or hdfs environment
static leveldb::Env* FLAGS_env = leveldb::Env::Default(); static leveldb::Env* FLAGS_env = leveldb::Env::Default();
@ -518,15 +520,18 @@ class Benchmark {
switch (FLAGS_compression_type) { switch (FLAGS_compression_type) {
case kSnappyCompression: case kSnappyCompression:
result = port::Snappy_Compress(text, strlen(text), &compressed); result = port::Snappy_Compress(Options().compression_opts, text,
strlen(text), &compressed);
name = "Snappy"; name = "Snappy";
break; break;
case kZlibCompression: case kZlibCompression:
result = port::Zlib_Compress(text, strlen(text), &compressed); result = port::Zlib_Compress(Options().compression_opts, text,
strlen(text), &compressed);
name = "Zlib"; name = "Zlib";
break; break;
case kBZip2Compression: case kBZip2Compression:
result = port::BZip2_Compress(text, strlen(text), &compressed); result = port::BZip2_Compress(Options().compression_opts, text,
strlen(text), &compressed);
name = "BZip2"; name = "BZip2";
break; break;
} }
@ -864,7 +869,8 @@ class Benchmark {
bool ok = true; bool ok = true;
std::string compressed; std::string compressed;
while (ok && bytes < 1024 * 1048576) { // Compress 1G while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Snappy_Compress(input.data(), input.size(), &compressed); ok = port::Snappy_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
produced += compressed.size(); produced += compressed.size();
bytes += input.size(); bytes += input.size();
thread->stats.FinishedSingleOp(NULL); thread->stats.FinishedSingleOp(NULL);
@ -885,7 +891,8 @@ class Benchmark {
RandomGenerator gen; RandomGenerator gen;
Slice input = gen.Generate(Options().block_size); Slice input = gen.Generate(Options().block_size);
std::string compressed; std::string compressed;
bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed); bool ok = port::Snappy_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
int64_t bytes = 0; int64_t bytes = 0;
char* uncompressed = new char[input.size()]; char* uncompressed = new char[input.size()];
while (ok && bytes < 1024 * 1048576) { // Compress 1G while (ok && bytes < 1024 * 1048576) { // Compress 1G
@ -908,6 +915,9 @@ class Benchmark {
Options options; Options options;
options.create_if_missing = !FLAGS_use_existing_db; options.create_if_missing = !FLAGS_use_existing_db;
options.block_cache = cache_; options.block_cache = cache_;
if (cache_ == NULL) {
options.no_block_cache = true;
}
options.write_buffer_size = FLAGS_write_buffer_size; options.write_buffer_size = FLAGS_write_buffer_size;
options.max_write_buffer_number = FLAGS_max_write_buffer_number; options.max_write_buffer_number = FLAGS_max_write_buffer_number;
options.max_background_compactions = FLAGS_max_background_compactions; options.max_background_compactions = FLAGS_max_background_compactions;
@ -936,7 +946,7 @@ class Benchmark {
for (unsigned int i = 0; i < FLAGS_min_level_to_compress; i++) { for (unsigned int i = 0; i < FLAGS_min_level_to_compress; i++) {
options.compression_per_level[i] = kNoCompression; options.compression_per_level[i] = kNoCompression;
} }
for (unsigned int i = FLAGS_min_level_to_compress; for (unsigned int i = FLAGS_min_level_to_compress;
i < FLAGS_num_levels; i++) { i < FLAGS_num_levels; i++) {
options.compression_per_level[i] = FLAGS_compression_type; options.compression_per_level[i] = FLAGS_compression_type;
} }
@ -945,6 +955,7 @@ class Benchmark {
options.delete_obsolete_files_period_micros = options.delete_obsolete_files_period_micros =
FLAGS_delete_obsolete_files_period_micros; FLAGS_delete_obsolete_files_period_micros;
options.rate_limit = FLAGS_rate_limit; options.rate_limit = FLAGS_rate_limit;
options.table_cache_numshardbits = FLAGS_table_cache_numshardbits;
Status s = DB::Open(options, FLAGS_db, &db_); Status s = DB::Open(options, FLAGS_db, &db_);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str()); fprintf(stderr, "open error: %s\n", s.ToString().c_str());
@ -1276,6 +1287,13 @@ int main(int argc, char** argv) {
fprintf(stderr, "The cache cannot be sharded into 2**%d pieces\n", n); fprintf(stderr, "The cache cannot be sharded into 2**%d pieces\n", n);
exit(1); exit(1);
} }
} else if (sscanf(argv[i], "--table_cache_numshardbits=%d%c",
&n, &junk) == 1) {
if (n <= 0 || n > 20) {
fprintf(stderr, "The cache cannot be sharded into 2**%d pieces\n", n);
exit(1);
}
FLAGS_table_cache_numshardbits = n;
} else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) {
FLAGS_bloom_bits = n; FLAGS_bloom_bits = n;
} else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) {
@ -1359,8 +1377,8 @@ int main(int argc, char** argv) {
else { else {
fprintf(stdout, "Cannot parse %s\n", argv[i]); fprintf(stdout, "Cannot parse %s\n", argv[i]);
} }
} else if (sscanf(argv[i], "--min_level_to_compress=%d%c", &n, &junk) == 1 } else if (sscanf(argv[i], "--min_level_to_compress=%d%c", &n, &junk) == 1
&& n >= 0) { && n >= 0) {
FLAGS_min_level_to_compress = n; FLAGS_min_level_to_compress = n;
} else if (sscanf(argv[i], "--disable_seek_compaction=%d%c", &n, &junk) == 1 } else if (sscanf(argv[i], "--disable_seek_compaction=%d%c", &n, &junk) == 1
&& (n == 0 || n == 1)) { && (n == 0 || n == 1)) {

@ -155,7 +155,7 @@ Options SanitizeOptions(const std::string& dbname,
result.info_log = NULL; result.info_log = NULL;
} }
} }
if (result.block_cache == NULL) { if (result.block_cache == NULL && !result.no_block_cache) {
result.block_cache = NewLRUCache(8 << 20); result.block_cache = NewLRUCache(8 << 20);
} }
if (src.compression_per_level != NULL) { if (src.compression_per_level != NULL) {
@ -1707,9 +1707,14 @@ Status DBImpl::MakeRoomForWrite(bool force) {
// this delay hands over some CPU to the compaction thread in // this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer. // case it is sharing the same core as the writer.
mutex_.Unlock(); mutex_.Unlock();
uint64_t t1 = env_->NowMicros();
env_->SleepForMicroseconds(1000); env_->SleepForMicroseconds(1000);
stall_level0_slowdown_ += 1000; uint64_t delayed = env_->NowMicros() - t1;
stall_level0_slowdown_ += delayed;
allow_delay = false; // Do not delay a single write more than once allow_delay = false; // Do not delay a single write more than once
//Log(options_.info_log,
// "delaying write %llu usecs for level0_slowdown_writes_trigger\n",
// delayed);
mutex_.Lock(); mutex_.Lock();
delayed_writes_++; delayed_writes_++;
} else if (!force && } else if (!force &&
@ -1741,11 +1746,14 @@ Status DBImpl::MakeRoomForWrite(bool force) {
(score = versions_->MaxCompactionScore()) > options_.rate_limit) { (score = versions_->MaxCompactionScore()) > options_.rate_limit) {
// Delay a write when the compaction score for any level is too large. // Delay a write when the compaction score for any level is too large.
mutex_.Unlock(); mutex_.Unlock();
uint64_t t1 = env_->NowMicros();
env_->SleepForMicroseconds(1000); env_->SleepForMicroseconds(1000);
stall_leveln_slowdown_ += 1000; uint64_t delayed = env_->NowMicros() - t1;
stall_leveln_slowdown_ += delayed;
allow_delay = false; // Do not delay a single write more than once allow_delay = false; // Do not delay a single write more than once
Log(options_.info_log, Log(options_.info_log,
"delaying write for rate limits with max score %.2f\n", score); "delaying write %llu usecs for rate limits with max score %.2f\n",
delayed, score);
mutex_.Lock(); mutex_.Lock();
} else { } else {
// Attempt to switch to a new memtable and trigger compaction of old // Attempt to switch to a new memtable and trigger compaction of old
@ -1930,6 +1938,10 @@ Status DB::Open(const Options& options, const std::string& dbname,
DB** dbptr) { DB** dbptr) {
*dbptr = NULL; *dbptr = NULL;
if (options.block_cache != NULL && options.no_block_cache) {
return Status::InvalidArgument(
"no_block_cache is true while block_cache is not NULL");
}
DBImpl* impl = new DBImpl(options, dbname); DBImpl* impl = new DBImpl(options, dbname);
impl->mutex_.Lock(); impl->mutex_.Lock();
VersionEdit edit(impl->NumberLevels()); VersionEdit edit(impl->NumberLevels());

@ -20,23 +20,23 @@
namespace leveldb { namespace leveldb {
static bool SnappyCompressionSupported() { static bool SnappyCompressionSupported(const CompressionOptions& options) {
std::string out; std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Snappy_Compress(in.data(), in.size(), &out); return port::Snappy_Compress(options, in.data(), in.size(), &out);
} }
static bool ZlibCompressionSupported() { static bool ZlibCompressionSupported(const CompressionOptions& options) {
std::string out; std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Zlib_Compress(in.data(), in.size(), &out); return port::Zlib_Compress(options, in.data(), in.size(), &out);
} }
static bool BZip2CompressionSupported() { static bool BZip2CompressionSupported(const CompressionOptions& options) {
std::string out; std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::BZip2_Compress(in.data(), in.size(), &out); return port::BZip2_Compress(options, in.data(), in.size(), &out);
} }
static std::string RandomString(Random* rnd, int len) { static std::string RandomString(Random* rnd, int len) {
std::string r; std::string r;
@ -1076,57 +1076,59 @@ TEST(DBTest, CompactionTrigger) {
ASSERT_EQ(NumTableFilesAtLevel(1), 1); ASSERT_EQ(NumTableFilesAtLevel(1), 1);
} }
void MinLevelHelper(DBTest* self, Options& options) { void MinLevelHelper(DBTest* self, Options& options) {
Random rnd(301); Random rnd(301);
for (int num = 0; for (int num = 0;
num < options.level0_file_num_compaction_trigger - 1; num < options.level0_file_num_compaction_trigger - 1;
num++) num++)
{ {
std::vector<std::string> values; std::vector<std::string> values;
// Write 120KB (12 values, each 10K) // Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) { for (int i = 0; i < 12; i++) {
values.push_back(RandomString(&rnd, 10000)); values.push_back(RandomString(&rnd, 10000));
ASSERT_OK(self->Put(Key(i), values[i])); ASSERT_OK(self->Put(Key(i), values[i]));
} }
self->dbfull()->TEST_WaitForCompactMemTable(); self->dbfull()->TEST_WaitForCompactMemTable();
ASSERT_EQ(self->NumTableFilesAtLevel(0), num + 1); ASSERT_EQ(self->NumTableFilesAtLevel(0), num + 1);
} }
//generate one more file in level-0, and should trigger level-0 compaction //generate one more file in level-0, and should trigger level-0 compaction
std::vector<std::string> values; std::vector<std::string> values;
for (int i = 0; i < 12; i++) { for (int i = 0; i < 12; i++) {
values.push_back(RandomString(&rnd, 10000)); values.push_back(RandomString(&rnd, 10000));
ASSERT_OK(self->Put(Key(i), values[i])); ASSERT_OK(self->Put(Key(i), values[i]));
} }
self->dbfull()->TEST_WaitForCompact(); self->dbfull()->TEST_WaitForCompact();
ASSERT_EQ(self->NumTableFilesAtLevel(0), 0); ASSERT_EQ(self->NumTableFilesAtLevel(0), 0);
ASSERT_EQ(self->NumTableFilesAtLevel(1), 1); ASSERT_EQ(self->NumTableFilesAtLevel(1), 1);
} }
TEST(DBTest, MinLevelToCompress) { void MinLevelToCompress(CompressionType& type, Options& options, int wbits,
Options options = CurrentOptions(); int lev, int strategy) {
options.write_buffer_size = 100<<10; //100KB fprintf(stderr, "Test with compression options : window_bits = %d, level = %d, strategy = %d}\n", wbits, lev, strategy);
options.num_levels = 3; options.write_buffer_size = 100<<10; //100KB
options.max_mem_compaction_level = 0; options.num_levels = 3;
options.level0_file_num_compaction_trigger = 3; options.max_mem_compaction_level = 0;
options.level0_file_num_compaction_trigger = 3;
options.create_if_missing = true; options.create_if_missing = true;
CompressionType type;
if (SnappyCompressionSupported(CompressionOptions(wbits, lev, strategy))) {
if (SnappyCompressionSupported()) { type = kSnappyCompression;
type = kSnappyCompression; fprintf(stderr, "using snappy\n");
fprintf(stderr, "using snappy\n"); } else if (ZlibCompressionSupported(
} else if (ZlibCompressionSupported()) { CompressionOptions(wbits, lev, strategy))) {
type = kZlibCompression; type = kZlibCompression;
fprintf(stderr, "using zlib\n"); fprintf(stderr, "using zlib\n");
} else if (BZip2CompressionSupported()) { } else if (BZip2CompressionSupported(
type = kBZip2Compression; CompressionOptions(wbits, lev, strategy))) {
fprintf(stderr, "using bzip2\n"); type = kBZip2Compression;
} else { fprintf(stderr, "using bzip2\n");
fprintf(stderr, "skipping test, compression disabled\n"); } else {
return; fprintf(stderr, "skipping test, compression disabled\n");
} return;
}
options.compression_per_level = new CompressionType[options.num_levels]; options.compression_per_level = new CompressionType[options.num_levels];
// do not compress L0 // do not compress L0
@ -1136,9 +1138,32 @@ TEST(DBTest, MinLevelToCompress) {
for (int i = 1; i < options.num_levels; i++) { for (int i = 1; i < options.num_levels; i++) {
options.compression_per_level[i] = type; options.compression_per_level[i] = type;
} }
Reopen(&options); }
MinLevelHelper(this, options); TEST(DBTest, MinLevelToCompress1) {
Options options = CurrentOptions();
CompressionType type;
MinLevelToCompress(type, options, -14, -1, 0);
Reopen(&options);
MinLevelHelper(this, options);
// do not compress L0 and L1
for (int i = 0; i < 2; i++) {
options.compression_per_level[i] = kNoCompression;
}
for (int i = 2; i < options.num_levels; i++) {
options.compression_per_level[i] = type;
}
DestroyAndReopen(&options);
MinLevelHelper(this, options);
}
TEST(DBTest, MinLevelToCompress2) {
Options options = CurrentOptions();
CompressionType type;
MinLevelToCompress(type, options, 15, -1, 0);
Reopen(&options);
MinLevelHelper(this, options);
// do not compress L0 and L1 // do not compress L0 and L1
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
options.compression_per_level[i] = kNoCompression; options.compression_per_level[i] = kNoCompression;
@ -1146,9 +1171,9 @@ TEST(DBTest, MinLevelToCompress) {
for (int i = 2; i < options.num_levels; i++) { for (int i = 2; i < options.num_levels; i++) {
options.compression_per_level[i] = type; options.compression_per_level[i] = type;
} }
DestroyAndReopen(&options); DestroyAndReopen(&options);
MinLevelHelper(this, options); MinLevelHelper(this, options);
} }
TEST(DBTest, RepeatedWritesToSameKey) { TEST(DBTest, RepeatedWritesToSameKey) {
Options options = CurrentOptions(); Options options = CurrentOptions();
@ -1682,6 +1707,29 @@ TEST(DBTest, DBOpen_Options) {
db = NULL; db = NULL;
} }
TEST(DBTest, DBOpen_Change_NumLevels) {
std::string dbname = test::TmpDir() + "/db_change_num_levels";
DestroyDB(dbname, Options());
Options opts;
Status s;
DB* db = NULL;
opts.create_if_missing = true;
s = DB::Open(opts, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != NULL);
db->Put(WriteOptions(), "a", "123");
db->Put(WriteOptions(), "b", "234");
db->CompactRange(NULL, NULL);
delete db;
db = NULL;
opts.create_if_missing = false;
opts.num_levels = 2;
s = DB::Open(opts, dbname, &db);
ASSERT_TRUE(strstr(s.ToString().c_str(), "Corruption") != NULL);
ASSERT_TRUE(db == NULL);
}
// Check that number of files does not grow when we are out of space // Check that number of files does not grow when we are out of space
TEST(DBTest, NoSpace) { TEST(DBTest, NoSpace) {
Options options = CurrentOptions(); Options options = CurrentOptions();
@ -1828,7 +1876,7 @@ TEST(DBTest, SnapshotFiles) {
uint64_t size; uint64_t size;
ASSERT_OK(env_->GetFileSize(src, &size)); ASSERT_OK(env_->GetFileSize(src, &size));
// record the number and the size of the // record the number and the size of the
// latest manifest file // latest manifest file
if (ParseFileName(files[i].substr(1), &number, &type)) { if (ParseFileName(files[i].substr(1), &number, &type)) {
if (type == kDescriptorFile) { if (type == kDescriptorFile) {
@ -1843,7 +1891,7 @@ TEST(DBTest, SnapshotFiles) {
ASSERT_OK(env_->NewSequentialFile(src, &srcfile)); ASSERT_OK(env_->NewSequentialFile(src, &srcfile));
WritableFile* destfile; WritableFile* destfile;
ASSERT_OK(env_->NewWritableFile(dest, &destfile)); ASSERT_OK(env_->NewWritableFile(dest, &destfile));
char buffer[4096]; char buffer[4096];
Slice slice; Slice slice;
while (size > 0) { while (size > 0) {
@ -1866,7 +1914,7 @@ TEST(DBTest, SnapshotFiles) {
extras.push_back(RandomString(&rnd, 100000)); extras.push_back(RandomString(&rnd, 100000));
ASSERT_OK(Put(Key(i), extras[i])); ASSERT_OK(Put(Key(i), extras[i]));
} }
// verify that data in the snapshot are correct // verify that data in the snapshot are correct
Options opts; Options opts;
DB* snapdb; DB* snapdb;
@ -1882,7 +1930,7 @@ TEST(DBTest, SnapshotFiles) {
} }
delete snapdb; delete snapdb;
// look at the new live files after we added an 'extra' key // look at the new live files after we added an 'extra' key
// and after we took the first snapshot. // and after we took the first snapshot.
uint64_t new_manifest_number = 0; uint64_t new_manifest_number = 0;
uint64_t new_manifest_size = 0; uint64_t new_manifest_size = 0;
@ -1896,7 +1944,7 @@ TEST(DBTest, SnapshotFiles) {
// previous shapshot. // previous shapshot.
for (unsigned int i = 0; i < newfiles.size(); i++) { for (unsigned int i = 0; i < newfiles.size(); i++) {
std::string src = dbname_ + "/" + newfiles[i]; std::string src = dbname_ + "/" + newfiles[i];
// record the lognumber and the size of the // record the lognumber and the size of the
// latest manifest file // latest manifest file
if (ParseFileName(newfiles[i].substr(1), &number, &type)) { if (ParseFileName(newfiles[i].substr(1), &number, &type)) {
if (type == kDescriptorFile) { if (type == kDescriptorFile) {
@ -1911,7 +1959,7 @@ TEST(DBTest, SnapshotFiles) {
} }
ASSERT_EQ(manifest_number, new_manifest_number); ASSERT_EQ(manifest_number, new_manifest_number);
ASSERT_GT(new_manifest_size, manifest_size); ASSERT_GT(new_manifest_size, manifest_size);
// release file snapshot // release file snapshot
dbfull()->DisableFileDeletions(); dbfull()->DisableFileDeletions();
} }
@ -1975,7 +2023,7 @@ TEST(DBTest, ReadCompaction) {
// in some level, indicating that there was a compaction // in some level, indicating that there was a compaction
ASSERT_TRUE(NumTableFilesAtLevel(0) < l1 || ASSERT_TRUE(NumTableFilesAtLevel(0) < l1 ||
NumTableFilesAtLevel(1) < l2 || NumTableFilesAtLevel(1) < l2 ||
NumTableFilesAtLevel(2) < l3); NumTableFilesAtLevel(2) < l3);
delete options.block_cache; delete options.block_cache;
} }
} }

@ -39,7 +39,7 @@ TableCache::TableCache(const std::string& dbname,
: env_(options->env), : env_(options->env),
dbname_(dbname), dbname_(dbname),
options_(options), options_(options),
cache_(NewLRUCache(entries)) { cache_(NewLRUCache(entries, options->table_cache_numshardbits)) {
dbstatistics = (DBStatistics*)options->statistics; dbstatistics = (DBStatistics*)options->statistics;
} }

@ -32,6 +32,19 @@ enum CompressionType {
kBZip2Compression = 0x3 kBZip2Compression = 0x3
}; };
// Compression options for different compression algorithms like Zlib
struct CompressionOptions {
int window_bits;
int level;
int strategy;
CompressionOptions():window_bits(-14),
level(-1),
strategy(0){}
CompressionOptions(int wbits, int lev, int strategy):window_bits(wbits),
level(lev),
strategy(strategy){}
};
// Options to control the behavior of a database (passed to DB::Open) // Options to control the behavior of a database (passed to DB::Open)
struct Options { struct Options {
// ------------------- // -------------------
@ -151,10 +164,13 @@ struct Options {
// reponsible for allocating memory and initializing the values in it // reponsible for allocating memory and initializing the values in it
// before invoking Open(). The caller is responsible for freeing this // before invoking Open(). The caller is responsible for freeing this
// array and it could be freed anytime after the return from Open(). // array and it could be freed anytime after the return from Open().
// This could have been a std::vector but that makes the equivalent // This could have been a std::vector but that makes the equivalent
// java/C api hard to construct. // java/C api hard to construct.
CompressionType* compression_per_level; CompressionType* compression_per_level;
//different options for compression algorithms
CompressionOptions compression_opts;
// If non-NULL, use the specified filter policy to reduce disk reads. // If non-NULL, use the specified filter policy to reduce disk reads.
// Many applications will benefit from passing the result of // Many applications will benefit from passing the result of
// NewBloomFilterPolicy() here. // NewBloomFilterPolicy() here.
@ -282,6 +298,14 @@ struct Options {
// exceeds rate_limit. This is ignored when <= 1.0. // exceeds rate_limit. This is ignored when <= 1.0.
double rate_limit; double rate_limit;
// Disable block cache. If this is set to false,
// then no block cache should be used, and the block_cache should
// point to a NULL object.
bool no_block_cache;
// Number of shards used for table cache.
int table_cache_numshardbits;
// Create an Options object with default values for all fields. // Create an Options object with default values for all fields.
Options(); Options();

@ -44,6 +44,7 @@
#include <stdint.h> #include <stdint.h>
#include <string> #include <string>
#include <string.h> #include <string.h>
#include "leveldb/options.h"
#include "port/atomic_pointer.h" #include "port/atomic_pointer.h"
#ifndef PLATFORM_IS_LITTLE_ENDIAN #ifndef PLATFORM_IS_LITTLE_ENDIAN
@ -131,8 +132,8 @@ typedef pthread_once_t OnceType;
#define LEVELDB_ONCE_INIT PTHREAD_ONCE_INIT #define LEVELDB_ONCE_INIT PTHREAD_ONCE_INIT
extern void InitOnce(OnceType* once, void (*initializer)()); extern void InitOnce(OnceType* once, void (*initializer)());
inline bool Snappy_Compress(const char* input, size_t length, inline bool Snappy_Compress(const CompressionOptions& opts, const char* input,
::std::string* output) { size_t length, ::std::string* output) {
#ifdef SNAPPY #ifdef SNAPPY
output->resize(snappy::MaxCompressedLength(length)); output->resize(snappy::MaxCompressedLength(length));
size_t outlen; size_t outlen;
@ -162,9 +163,8 @@ inline bool Snappy_Uncompress(const char* input, size_t length,
#endif #endif
} }
inline bool Zlib_Compress(const char* input, size_t length, inline bool Zlib_Compress(const CompressionOptions& opts, const char* input,
::std::string* output, int windowBits = -14, int level = -1, size_t length, ::std::string* output) {
int strategy = 0) {
#ifdef ZLIB #ifdef ZLIB
// The memLevel parameter specifies how much memory should be allocated for // The memLevel parameter specifies how much memory should be allocated for
// the internal compression state. // the internal compression state.
@ -174,8 +174,8 @@ inline bool Zlib_Compress(const char* input, size_t length,
static const int memLevel = 8; static const int memLevel = 8;
z_stream _stream; z_stream _stream;
memset(&_stream, 0, sizeof(z_stream)); memset(&_stream, 0, sizeof(z_stream));
int st = deflateInit2(&_stream, level, Z_DEFLATED, windowBits, int st = deflateInit2(&_stream, opts.level, Z_DEFLATED, opts.window_bits,
memLevel, strategy); memLevel, opts.strategy);
if (st != Z_OK) { if (st != Z_OK) {
return false; return false;
} }
@ -284,8 +284,8 @@ inline char* Zlib_Uncompress(const char* input_data, size_t input_length,
return NULL; return NULL;
} }
inline bool BZip2_Compress(const char* input, size_t length, inline bool BZip2_Compress(const CompressionOptions& opts, const char* input,
::std::string* output) { size_t length, ::std::string* output) {
#ifdef BZIP2 #ifdef BZIP2
bz_stream _stream; bz_stream _stream;
memset(&_stream, 0, sizeof(bz_stream)); memset(&_stream, 0, sizeof(bz_stream));

@ -175,7 +175,8 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
case kSnappyCompression: { case kSnappyCompression: {
std::string* compressed = &r->compressed_output; std::string* compressed = &r->compressed_output;
if (port::Snappy_Compress(raw.data(), raw.size(), compressed) && if (port::Snappy_Compress(r->options.compression_opts, raw.data(),
raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) { GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed; block_contents = *compressed;
} else { } else {
@ -187,7 +188,8 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
break; break;
} }
case kZlibCompression: case kZlibCompression:
if (port::Zlib_Compress(raw.data(), raw.size(), compressed) && if (port::Zlib_Compress(r->options.compression_opts, raw.data(),
raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) { GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed; block_contents = *compressed;
} else { } else {
@ -198,7 +200,8 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
} }
break; break;
case kBZip2Compression: case kBZip2Compression:
if (port::BZip2_Compress(raw.data(), raw.size(), compressed) && if (port::BZip2_Compress(r->options.compression_opts, raw.data(),
raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) { GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed; block_contents = *compressed;
} else { } else {

@ -247,6 +247,7 @@ class TableConstructor: public Constructor {
source_ = new StringSource(sink.contents()); source_ = new StringSource(sink.contents());
Options table_options; Options table_options;
table_options.comparator = options.comparator; table_options.comparator = options.comparator;
table_options.compression_opts = options.compression_opts;
return Table::Open(table_options, source_, sink.contents().size(), &table_); return Table::Open(table_options, source_, sink.contents().size(), &table_);
} }
@ -399,19 +400,22 @@ class DBConstructor: public Constructor {
static bool SnappyCompressionSupported() { static bool SnappyCompressionSupported() {
std::string out; std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Snappy_Compress(in.data(), in.size(), &out); return port::Snappy_Compress(Options().compression_opts, in.data(), in.size(),
&out);
} }
static bool ZlibCompressionSupported() { static bool ZlibCompressionSupported() {
std::string out; std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Zlib_Compress(in.data(), in.size(), &out); return port::Zlib_Compress(Options().compression_opts, in.data(), in.size(),
&out);
} }
static bool BZip2CompressionSupported() { static bool BZip2CompressionSupported() {
std::string out; std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::BZip2_Compress(in.data(), in.size(), &out); return port::BZip2_Compress(Options().compression_opts, in.data(), in.size(),
&out);
} }
enum TestType { enum TestType {

@ -44,10 +44,12 @@ Options::Options()
db_stats_log_interval(1800), db_stats_log_interval(1800),
db_log_dir(""), db_log_dir(""),
disable_seek_compaction(false), disable_seek_compaction(false),
delete_obsolete_files_period_micros(0), no_block_cache(false),
max_background_compactions(1), table_cache_numshardbits(4),
max_log_file_size(0), max_log_file_size(0),
rate_limit(0.0) { delete_obsolete_files_period_micros(0),
rate_limit(0.0),
max_background_compactions(1) {
} }
void void
@ -64,18 +66,15 @@ Options::Dump(
Log(log," Options.max_write_buffer_number: %zd", max_write_buffer_number); Log(log," Options.max_write_buffer_number: %zd", max_write_buffer_number);
Log(log," Options.max_open_files: %d", max_open_files); Log(log," Options.max_open_files: %d", max_open_files);
Log(log," Options.block_cache: %p", block_cache); Log(log," Options.block_cache: %p", block_cache);
Log(log," Options.block_cache_size: %zd", block_cache->GetCapacity()); if (block_cache) {
Log(log," Options.block_cache_size: %zd",
block_cache->GetCapacity());
}
Log(log," Options.block_size: %zd", block_size); Log(log," Options.block_size: %zd", block_size);
Log(log," Options.block_restart_interval: %d", block_restart_interval); Log(log," Options.block_restart_interval: %d", block_restart_interval);
Log(log," Options.compression: %d", compression);
Log(log," Options.filter_policy: %s",
filter_policy == NULL ? "NULL" : filter_policy->Name());
Log(log," Options.num_levels: %d", num_levels);
Log(log," Options.disableDataSync: %d", disableDataSync);
Log(log," Options.use_fsync: %d", use_fsync);
if (compression_per_level != NULL) { if (compression_per_level != NULL) {
for (unsigned int i = 0; i < num_levels; i++){ for (unsigned int i = 0; i < num_levels; i++){
Log(log," Options.compression[%d]: %d", Log(log," Options.compression[%d]: %d",
i, compression_per_level[i]); i, compression_per_level[i]);
} }
} else { } else {
@ -89,6 +88,12 @@ Options::Dump(
Log(log," Options.max_log_file_size: %d", max_log_file_size); Log(log," Options.max_log_file_size: %d", max_log_file_size);
Log(log," Options.db_stats_log_interval: %d", Log(log," Options.db_stats_log_interval: %d",
db_stats_log_interval); db_stats_log_interval);
Log(log," Options.compression_opts.window_bits: %d",
compression_opts.window_bits);
Log(log," Options.compression_opts.level: %d",
compression_opts.level);
Log(log," Options.compression_opts.strategy: %d",
compression_opts.strategy);
Log(log," Options.level0_file_num_compaction_trigger: %d", Log(log," Options.level0_file_num_compaction_trigger: %d",
level0_file_num_compaction_trigger); level0_file_num_compaction_trigger);
Log(log," Options.level0_slowdown_writes_trigger: %d", Log(log," Options.level0_slowdown_writes_trigger: %d",
@ -113,6 +118,10 @@ Options::Dump(
db_log_dir.c_str()); db_log_dir.c_str());
Log(log," Options.disable_seek_compaction: %d", Log(log," Options.disable_seek_compaction: %d",
disable_seek_compaction); disable_seek_compaction);
Log(log," Options.no_block_cache: %d",
no_block_cache);
Log(log," Options.table_cache_numshardbits: %d",
table_cache_numshardbits);
Log(log," Options.delete_obsolete_files_period_micros: %ld", Log(log," Options.delete_obsolete_files_period_micros: %ld",
delete_obsolete_files_period_micros); delete_obsolete_files_period_micros);
Log(log," Options.max_background_compactions: %d", Log(log," Options.max_background_compactions: %d",
@ -121,5 +130,4 @@ Options::Dump(
rate_limit); rate_limit);
} // Options::Dump } // Options::Dump
} // namespace leveldb } // namespace leveldb

Loading…
Cancel
Save