diff --git a/build_detect_platform b/build_detect_platform index 582cdcfd3..27cc75eb4 100755 --- a/build_detect_platform +++ b/build_detect_platform @@ -120,12 +120,12 @@ esac # prune take effect. DIRS="util db table" if test "$USE_THRIFT"; then - DIRS+=" thrift/server_utils.cpp thrift/gen-cpp " + DIRS="$DIRS thrift/server_utils.cpp thrift/gen-cpp " THRIFTSERVER=leveldb_server fi if test "$USE_SCRIBE"; then - DIRS+=" scribe " + DIRS="$DIRS scribe " fi set -f # temporarily disable globbing so that our patterns aren't expanded @@ -204,24 +204,24 @@ if test "$USE_HDFS"; then echo "JAVA_HOME has to be set for HDFS usage." exit 1 fi - HDFS_CCFLAGS+=" -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS" - HDFS_LDFLAGS+=" -Wl,--no-whole-archive hdfs/libhdfs.a -L$JAVA_HOME/jre/lib/amd64" - HDFS_LDFLAGS+=" -L$JAVA_HOME/jre/lib/amd64/server -L$GLIBC_RUNTIME_PATH/lib" - HDFS_LDFLAGS+=" -ldl -lverify -ljava -ljvm" - COMMON_FLAGS+=$HDFS_CCFLAGS - PLATFORM_LDFLAGS+=$HDFS_LDFLAGS + HDFS_CCFLAGS="$HDFS_CCFLAGS -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS" + HDFS_LDFLAGS="$HDFS_LDFLAGS -Wl,--no-whole-archive hdfs/libhdfs.a -L$JAVA_HOME/jre/lib/amd64" + HDFS_LDFLAGS="$HDFS_LDFLAGS -L$JAVA_HOME/jre/lib/amd64/server -L$GLIBC_RUNTIME_PATH/lib" + HDFS_LDFLAGS="$HDFS_LDFLAGS -ldl -lverify -ljava -ljvm" + COMMON_FLAGS="$COMMON_FLAGS $HDFS_CCFLAGS" + PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS $HDFS_LDFLAGS" fi # shall we build thrift server or scribe logger if test "$USE_THRIFT" || test "$USE_SCRIBE" ; then THRIFT_CCFLAGS=" -I./thrift -I./thrift/gen-cpp -I./thrift/lib/cpp -I/usr/include -std=gnu++0x" THRIFT_LDFLAGS=" -lexample -lserver -lthrift_base -ltransport -lthrift_exception -lutil -L./thrift/libs " - COMMON_FLAGS+=$THRIFT_CCFLAGS - PLATFORM_LDFLAGS+=$THRIFT_LDFLAGS + COMMON_FLAGS="$COMMON_FLAGS $THRIFT_CCFLAGS" + PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS $THRIFT_LDFLAGS" fi # if Intel SSE instruction set is supported, set USE_SSE=" -msse -msse4.2 " -COMMON_FLAGS+=$USE_SSE +COMMON_FLAGS="$COMMON_FLAGS $USE_SSE" PLATFORM_CCFLAGS="$PLATFORM_CCFLAGS $COMMON_FLAGS" PLATFORM_CXXFLAGS="$PLATFORM_CXXFLAGS $COMMON_FLAGS" diff --git a/db/builder.cc b/db/builder.cc index 4a07aeeb2..1fc200930 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -32,7 +32,7 @@ Status BuildTable(const std::string& dbname, return s; } - TableBuilder* builder = new TableBuilder(options, file); + TableBuilder* builder = new TableBuilder(options, file, 0); meta->smallest.DecodeFrom(iter->key()); for (; iter->Valid(); iter->Next()) { Slice key = iter->key(); diff --git a/db/db_bench.cc b/db/db_bench.cc index 355e3adf4..e6f0fbf12 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -189,6 +189,10 @@ static uint64_t FLAGS_delete_obsolete_files_period_micros = 0; static enum leveldb::CompressionType FLAGS_compression_type = leveldb::kSnappyCompression; +// Allows compression for levels 0 and 1 to be disabled when +// other levels are compressed +static int FLAGS_min_level_to_compress = -1; + // posix or hdfs environment static leveldb::Env* FLAGS_env = leveldb::Env::Default(); @@ -196,6 +200,15 @@ static leveldb::Env* FLAGS_env = leveldb::Env::Default(); // than zero. When 0 the interval grows over time. static int FLAGS_stats_interval = 0; +// Reports additional stats per interval when this is greater +// than 0. +static int FLAGS_stats_per_interval = 0; + +// When not equal to 0 this make threads sleep at each stats +// reporting interval until the compaction score for all levels is +// less than or equal to this value. +static double FLAGS_rate_limit = 0; + extern bool useOsBuffer; extern bool useFsReadAhead; extern bool useMmapRead; @@ -339,17 +352,21 @@ class Stats { } else { double now = FLAGS_env->NowMicros(); fprintf(stderr, - "%s thread %d: (%ld,%ld) ops (interval,total) in %.6f seconds and %.2f ops/sec\n", + "%s ... thread %d: (%ld,%ld) ops and (%.1f,%.1f) ops/second in (%.6f,%.6f) seconds\n", FLAGS_env->TimeToString((uint64_t) now/1000000).c_str(), id_, done_ - last_report_done_, done_, - (now - last_report_finish_) / 1000000.0, (done_ - last_report_done_) / - ((now - last_report_finish_) / 1000000.0)); + ((now - last_report_finish_) / 1000000.0), + done_ / ((now - start_) / 1000000.0), + (now - last_report_finish_) / 1000000.0, + (now - start_) / 1000000.0); - std::string stats; - if (db && db->GetProperty("leveldb.stats", &stats)) - fprintf(stderr, stats.c_str()); + if (FLAGS_stats_per_interval) { + std::string stats; + if (db && db->GetProperty("leveldb.stats", &stats)) + fprintf(stderr, stats.c_str()); + } fflush(stderr); next_report_ += FLAGS_stats_interval; @@ -913,14 +930,29 @@ class Benchmark { options.level0_slowdown_writes_trigger = FLAGS_level0_slowdown_writes_trigger; options.compression = FLAGS_compression_type; + if (FLAGS_min_level_to_compress >= 0) { + assert(FLAGS_min_level_to_compress <= FLAGS_num_levels); + options.compression_per_level = new CompressionType[FLAGS_num_levels]; + for (unsigned int i = 0; i < FLAGS_min_level_to_compress; i++) { + options.compression_per_level[i] = kNoCompression; + } + for (unsigned int i = FLAGS_min_level_to_compress; + i < FLAGS_num_levels; i++) { + options.compression_per_level[i] = FLAGS_compression_type; + } + } options.disable_seek_compaction = FLAGS_disable_seek_compaction; options.delete_obsolete_files_period_micros = FLAGS_delete_obsolete_files_period_micros; + options.rate_limit = FLAGS_rate_limit; Status s = DB::Open(options, FLAGS_db, &db_); if (!s.ok()) { fprintf(stderr, "open error: %s\n", s.ToString().c_str()); exit(1); } + if (FLAGS_min_level_to_compress >= 0) { + delete options.compression_per_level; + } } void WriteSeq(ThreadState* thread) { @@ -1327,6 +1359,9 @@ int main(int argc, char** argv) { else { fprintf(stdout, "Cannot parse %s\n", argv[i]); } + } else if (sscanf(argv[i], "--min_level_to_compress=%d%c", &n, &junk) == 1 + && n >= 0) { + FLAGS_min_level_to_compress = n; } else if (sscanf(argv[i], "--disable_seek_compaction=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { FLAGS_disable_seek_compaction = n; @@ -1336,6 +1371,12 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--stats_interval=%d%c", &n, &junk) == 1 && n >= 0 && n < 2000000000) { FLAGS_stats_interval = n; + } else if (sscanf(argv[i], "--stats_per_interval=%d%c", &n, &junk) == 1 + && (n == 0 || n == 1)) { + FLAGS_stats_per_interval = n; + } else if (sscanf(argv[i], "--rate_limit=%lf%c", &d, &junk) == 1 && + d > 1.0) { + FLAGS_rate_limit = d; } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); exit(1); diff --git a/db/db_impl.cc b/db/db_impl.cc index 2eef46f43..784891e2e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -35,11 +35,40 @@ #include "util/logging.h" #include "util/mutexlock.h" #include "util/build_version.h" +#include "util/auto_split_logger.h" namespace leveldb { void dumpLeveldbBuildVersion(Logger * log); +static Status NewLogger(const std::string& dbname, + const std::string& db_log_dir, + Env* env, + size_t max_log_file_size, + Logger** logger) { + std::string db_absolute_path; + env->GetAbsolutePath(dbname, &db_absolute_path); + + if (max_log_file_size > 0) { // need to auto split the log file? + AutoSplitLogger* auto_split_logger = + new AutoSplitLogger(env, dbname, db_log_dir, max_log_file_size); + Status s = auto_split_logger->GetStatus(); + if (!s.ok()) { + delete auto_split_logger; + } else { + *logger = auto_split_logger; + } + return s; + } else { + // Open a log file in the same directory as the db + env->CreateDir(dbname); // In case it does not exist + std::string fname = InfoLogFileName(dbname, db_absolute_path, db_log_dir); + env->RenameFile(fname, OldInfoLogFileName(dbname, env->NowMicros(), + db_absolute_path, db_log_dir)); + return env->NewLogger(fname, logger); + } +} + // Information kept for every waiting writer struct DBImpl::Writer { Status status; @@ -118,16 +147,9 @@ Options SanitizeOptions(const std::string& dbname, ClipToRange(&result.max_open_files, 20, 50000); ClipToRange(&result.write_buffer_size, 64<<10, 1<<30); ClipToRange(&result.block_size, 1<<10, 4<<20); - std::string db_absolute_path; - src.env->GetAbsolutePath(dbname, &db_absolute_path); if (result.info_log == NULL) { - // Open a log file in the same directory as the db - src.env->CreateDir(dbname); // In case it does not exist - src.env->RenameFile(InfoLogFileName(dbname, db_absolute_path, - result.db_log_dir), OldInfoLogFileName(dbname,src.env->NowMicros(), - db_absolute_path, result.db_log_dir)); - Status s = src.env->NewLogger(InfoLogFileName(dbname, db_absolute_path, - result.db_log_dir), &result.info_log); + Status s = NewLogger(dbname, result.db_log_dir, src.env, + result.max_log_file_size, &result.info_log); if (!s.ok()) { // No place suitable for logging result.info_log = NULL; @@ -136,6 +158,12 @@ Options SanitizeOptions(const std::string& dbname, if (result.block_cache == NULL) { result.block_cache = NewLRUCache(8 << 20); } + if (src.compression_per_level != NULL) { + result.compression_per_level = new CompressionType[src.num_levels]; + for (unsigned int i = 0; i < src.num_levels; i++) { + result.compression_per_level[i] = src.compression_per_level[i]; + } + } return result; } @@ -163,6 +191,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) disable_delete_obsolete_files_(false), delete_obsolete_files_last_run_(0), stall_level0_slowdown_(0), + stall_leveln_slowdown_(0), stall_memtable_compaction_(0), stall_level0_num_files_(0), started_at_(options.env->NowMicros()), @@ -224,6 +253,9 @@ DBImpl::~DBImpl() { if (owns_cache_) { delete options_.block_cache; } + if (options_.compression_per_level != NULL) { + delete options_.compression_per_level; + } delete logger_; } @@ -1097,7 +1129,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { std::string fname = TableFileName(dbname_, file_number); Status s = env_->NewWritableFile(fname, &compact->outfile); if (s.ok()) { - compact->builder = new TableBuilder(options_, compact->outfile); + compact->builder = new TableBuilder(options_, compact->outfile, + compact->compaction->level() + 1); } return s; } @@ -1656,6 +1689,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { assert(!writers_.empty()); bool allow_delay = !force; Status s; + double score; while (true) { if (!bg_error_.ok()) { @@ -1701,6 +1735,18 @@ Status DBImpl::MakeRoomForWrite(bool force) { Log(options_.info_log, "wait for fewer level0 files...\n"); bg_cv_.Wait(); stall_level0_num_files_ += env_->NowMicros() - t1; + } else if ( + allow_delay && + options_.rate_limit > 1.0 && + (score = versions_->MaxCompactionScore()) > options_.rate_limit) { + // Delay a write when the compaction score for any level is too large. + mutex_.Unlock(); + env_->SleepForMicroseconds(1000); + stall_leveln_slowdown_ += 1000; + allow_delay = false; // Do not delay a single write more than once + Log(options_.info_log, + "delaying write for rate limits with max score %.2f\n", score); + mutex_.Lock(); } else { // Attempt to switch to a new memtable and trigger compaction of old DelayLoggingAndReset(); @@ -1789,8 +1835,9 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { stats_[level].bytes_readnp1 / 1048576.0, bytes_new / 1048576.0, amplify, - bytes_read / 1048576.0 / seconds_up, - stats_[level].bytes_written / 1048576.0 / seconds_up, + (bytes_read / 1048576.0) / (stats_[level].micros / 1000000.0), + (stats_[level].bytes_written / 1048576.0) / + (stats_[level].micros / 1000000.0), stats_[level].files_in_leveln, stats_[level].files_in_levelnp1, stats_[level].files_out_levelnp1, @@ -1801,10 +1848,12 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { } snprintf(buf, sizeof(buf), - "Amplification: %.1f rate, %.2f GB in, %.2f GB out\n", + "Amplification: %.1f rate, %.2f GB in, %.2f GB out, %.2f MB/sec in, %.2f MB/sec out\n", (double) total_bytes / stats_[0].bytes_written, stats_[0].bytes_written / (1048576.0 * 1024), - total_bytes / (1048576.0 * 1024)); + total_bytes / (1048576.0 * 1024), + stats_[0].bytes_written / 1048576.0 / seconds_up, + total_bytes / 1048576.0 / seconds_up); value->append(buf); snprintf(buf, sizeof(buf), "Uptime(secs): %.1f\n", seconds_up); @@ -1812,10 +1861,11 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { snprintf(buf, sizeof(buf), "Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, " - "%.3f memtable_compaction\n", + "%.3f memtable_compaction, %.3f leveln_slowdown\n", stall_level0_slowdown_ / 1000000.0, stall_level0_num_files_ / 1000000.0, - stall_memtable_compaction_ / 1000000.0); + stall_memtable_compaction_ / 1000000.0, + stall_leveln_slowdown_ / 1000000.0); value->append(buf); return true; diff --git a/db/db_impl.h b/db/db_impl.h index 4d378a31b..0a7f05f93 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -228,6 +228,7 @@ class DBImpl : public DB { uint64_t stall_level0_slowdown_; uint64_t stall_memtable_compaction_; uint64_t stall_level0_num_files_; + uint64_t stall_leveln_slowdown_; // Time at which this instance was started. const uint64_t started_at_; diff --git a/db/db_test.cc b/db/db_test.cc index daea3aaaf..e5a8f8711 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -20,6 +20,24 @@ namespace leveldb { +static bool SnappyCompressionSupported() { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::Snappy_Compress(in.data(), in.size(), &out); +} + +static bool ZlibCompressionSupported() { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::Zlib_Compress(in.data(), in.size(), &out); +} + +static bool BZip2CompressionSupported() { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::BZip2_Compress(in.data(), in.size(), &out); +} + static std::string RandomString(Random* rnd, int len) { std::string r; test::RandomString(rnd, len, &r); @@ -1058,6 +1076,80 @@ TEST(DBTest, CompactionTrigger) { ASSERT_EQ(NumTableFilesAtLevel(1), 1); } +void MinLevelHelper(DBTest* self, Options& options) { + Random rnd(301); + + for (int num = 0; + num < options.level0_file_num_compaction_trigger - 1; + num++) + { + std::vector values; + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + values.push_back(RandomString(&rnd, 10000)); + ASSERT_OK(self->Put(Key(i), values[i])); + } + self->dbfull()->TEST_WaitForCompactMemTable(); + ASSERT_EQ(self->NumTableFilesAtLevel(0), num + 1); + } + + //generate one more file in level-0, and should trigger level-0 compaction + std::vector values; + for (int i = 0; i < 12; i++) { + values.push_back(RandomString(&rnd, 10000)); + ASSERT_OK(self->Put(Key(i), values[i])); + } + self->dbfull()->TEST_WaitForCompact(); + + ASSERT_EQ(self->NumTableFilesAtLevel(0), 0); + ASSERT_EQ(self->NumTableFilesAtLevel(1), 1); +} + +TEST(DBTest, MinLevelToCompress) { + Options options = CurrentOptions(); + options.write_buffer_size = 100<<10; //100KB + options.num_levels = 3; + options.max_mem_compaction_level = 0; + options.level0_file_num_compaction_trigger = 3; + options.create_if_missing = true; + CompressionType type; + + if (SnappyCompressionSupported()) { + type = kSnappyCompression; + fprintf(stderr, "using snappy\n"); + } else if (ZlibCompressionSupported()) { + type = kZlibCompression; + fprintf(stderr, "using zlib\n"); + } else if (BZip2CompressionSupported()) { + type = kBZip2Compression; + fprintf(stderr, "using bzip2\n"); + } else { + fprintf(stderr, "skipping test, compression disabled\n"); + return; + } + options.compression_per_level = new CompressionType[options.num_levels]; + + // do not compress L0 + for (int i = 0; i < 1; i++) { + options.compression_per_level[i] = kNoCompression; + } + for (int i = 1; i < options.num_levels; i++) { + options.compression_per_level[i] = type; + } + Reopen(&options); + MinLevelHelper(this, options); + + // do not compress L0 and L1 + for (int i = 0; i < 2; i++) { + options.compression_per_level[i] = kNoCompression; + } + for (int i = 2; i < options.num_levels; i++) { + options.compression_per_level[i] = type; + } + DestroyAndReopen(&options); + MinLevelHelper(this, options); +} + TEST(DBTest, RepeatedWritesToSameKey) { Options options = CurrentOptions(); options.env = env_; diff --git a/db/version_set.cc b/db/version_set.cc index 8bd785db5..2a7e90f90 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1231,6 +1231,7 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) { void VersionSet::Finalize(Version* v) { + double max_score = 0; for (int level = 0; level < NumberLevels()-1; level++) { double score; if (level == 0) { @@ -1274,11 +1275,17 @@ void VersionSet::Finalize(Version* v) { if (score > 1) { // Log(options_->info_log, "XXX score l%d = %d ", level, (int)score); } + if (max_score < score) { + max_score = score; + } } v->compaction_level_[level] = level; v->compaction_score_[level] = score; } + // update the max compaction score in levels 1 to n-1 + v->max_compaction_score_ = max_score; + // sort all the levels based on their score. Higher scores get listed // first. Use bubble sort because the number of entries are small. for(int i = 0; i < NumberLevels()-2; i++) { diff --git a/db/version_set.h b/db/version_set.h index 4bae0ef57..8dee811ee 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -145,6 +145,7 @@ class Version { // These are used to pick the best compaction level std::vector compaction_score_; std::vector compaction_level_; + double max_compaction_score_; // max score in l1 to ln-1 // The offset in the manifest file where this version is stored. uint64_t offset_manifest_file_; @@ -264,6 +265,11 @@ class VersionSet { NeedsSizeCompaction()); } + // Returns the maxmimum compaction score for levels 1 to max + double MaxCompactionScore() const { + return current_->max_compaction_score_; + } + // Add all files listed in any live version to *live. // May also mutate some internal state. void AddLiveFiles(std::set* live); diff --git a/include/leveldb/env.h b/include/leveldb/env.h index f4284ace4..7d4291656 100644 --- a/include/leveldb/env.h +++ b/include/leveldb/env.h @@ -226,9 +226,9 @@ class WritableFile { virtual Status Flush() = 0; virtual Status Sync() = 0; // sync data - /* + /* * Sync data and/or metadata as well. - * By default, sync only metadata. + * By default, sync only metadata. * Override this method for environments where we need to sync * metadata as well. */ @@ -252,11 +252,15 @@ class WritableFile { // An interface for writing log messages. class Logger { public: + enum { DO_NOT_SUPPORT_GET_LOG_FILE_SIZE = -1 }; Logger() { } virtual ~Logger(); // Write an entry to the log file with the specified format. virtual void Logv(const char* format, va_list ap) = 0; + virtual size_t GetLogFileSize() const { + return DO_NOT_SUPPORT_GET_LOG_FILE_SIZE; + } private: // No copying allowed diff --git a/include/leveldb/options.h b/include/leveldb/options.h index ecf81585f..f055a3f7d 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -141,6 +141,20 @@ struct Options { // efficiently detect that and will switch to uncompressed mode. CompressionType compression; + // Different levels can have different compression policies. There + // are cases where most lower levels would like to quick compression + // algorithm while the higher levels (which have more data) use + // compression algorithms that have better compression but could + // be slower. This array, if non NULL, should have an entry for + // each level of the database. This array, if non NULL, overides the + // value specified in the previous field 'compression'. The caller is + // reponsible for allocating memory and initializing the values in it + // before invoking Open(). The caller is responsible for freeing this + // array and it could be freed anytime after the return from Open(). + // This could have been a std::vector but that makes the equivalent + // java/C api hard to construct. + CompressionType* compression_per_level; + // If non-NULL, use the specified filter policy to reduce disk reads. // Many applications will benefit from passing the result of // NewBloomFilterPolicy() here. @@ -257,6 +271,17 @@ struct Options { // Default: 1 int max_background_compactions; + // Specify the maximal size of the info log file. If the log file + // is larger than `max_log_file_size`, a new info log file will + // be created. + // If max_log_file_size == 0, all logs will be written to one + // log file. + size_t max_log_file_size; + + // Puts are delayed when any level has a compaction score that + // exceeds rate_limit. This is ignored when <= 1.0. + double rate_limit; + // Create an Options object with default values for all fields. Options(); diff --git a/include/leveldb/table_builder.h b/include/leveldb/table_builder.h index 5fd1dc71f..5c755137e 100644 --- a/include/leveldb/table_builder.h +++ b/include/leveldb/table_builder.h @@ -27,8 +27,10 @@ class TableBuilder { public: // Create a builder that will store the contents of the table it is // building in *file. Does not close the file. It is up to the - // caller to close the file after calling Finish(). - TableBuilder(const Options& options, WritableFile* file); + // caller to close the file after calling Finish(). The output file + // will be part of level specified by 'level'. A value of -1 means + // that the caller does not know which level the output file will reside. + TableBuilder(const Options& options, WritableFile* file, int level=-1); // REQUIRES: Either Finish() or Abandon() has been called. ~TableBuilder(); @@ -81,6 +83,7 @@ class TableBuilder { struct Rep; Rep* rep_; + int level_; // No copying allowed TableBuilder(const TableBuilder&); diff --git a/table/table_builder.cc b/table/table_builder.cc index eabdedead..5306b2976 100644 --- a/table/table_builder.cc +++ b/table/table_builder.cc @@ -60,8 +60,9 @@ struct TableBuilder::Rep { } }; -TableBuilder::TableBuilder(const Options& options, WritableFile* file) - : rep_(new Rep(options, file)) { +TableBuilder::TableBuilder(const Options& options, WritableFile* file, + int level) + : rep_(new Rep(options, file)), level_(level) { if (rep_->filter_block != NULL) { rep_->filter_block->StartBlock(0); } @@ -152,7 +153,21 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { Slice block_contents; std::string* compressed = &r->compressed_output; - CompressionType type = r->options.compression; + CompressionType type; + // If the use has specified a different compression level for each level, + // then pick the compresison for that level. + if (r->options.compression_per_level != NULL) { + if (level_ == -1) { + // this is mostly for backward compatibility. The builder does not + // know which level this file belongs to. Apply the compression level + // specified for level 0 to all levels. + type = r->options.compression_per_level[0]; + } else { + type = r->options.compression_per_level[level_]; + } + } else { + type = r->options.compression; + } switch (type) { case kNoCompression: block_contents = raw; diff --git a/util/auto_split_logger.h b/util/auto_split_logger.h new file mode 100644 index 000000000..89429b0b4 --- /dev/null +++ b/util/auto_split_logger.h @@ -0,0 +1,81 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// Logger implementation that can be shared by all environments +// where enough posix functionality is available. + +#ifndef STORAGE_LEVELDB_UTIL_SPLIT_LOGGER_LOGGER_H_ +#define STORAGE_LEVELDB_UTIL_SPLIT_LOGGER_LOGGER_H_ + +#include "util/posix_logger.h" +#include "db/filename.h" + +namespace leveldb { + +// AutoSplitLogger can automatically create a new log file +// if the file size exceeds the limit. +// +// The template parameter "UnderlyingLogger" can be any Logger class +// that has the method "GetLogFileSize()" and "ResetFile()" +template +class AutoSplitLogger : public Logger { + private: + std::string log_fname_; // Current active info log's file name. + std::string dbname_; + std::string db_log_dir_; + std::string db_absolute_path_; + Env* env_; + UnderlyingLogger* logger_; + const size_t MAX_LOG_FILE_SIZE; + Status status_; + + public: + AutoSplitLogger(Env* env, const std::string& dbname, + const std::string& db_log_dir, size_t log_max_size): + env_(env), dbname_(dbname), db_log_dir_(db_log_dir), + MAX_LOG_FILE_SIZE(log_max_size), status_(Status::OK()) { + env->GetAbsolutePath(dbname, &db_absolute_path_); + log_fname_ = InfoLogFileName(dbname_, db_absolute_path_, db_log_dir_); + InitLogger(); + } + ~AutoSplitLogger() { delete logger_; } + + virtual void Logv(const char* format, va_list ap) { + assert(GetStatus().ok()); + + logger_->Logv(format, ap); + // Check if the log file should be splitted. + if (logger_->GetLogFileSize() > MAX_LOG_FILE_SIZE) { + delete logger_; + std::string old_fname = OldInfoLogFileName( + dbname_, env_->NowMicros(), db_absolute_path_, db_log_dir_); + env_->RenameFile(log_fname_, old_fname); + InitLogger(); + } + } + + // check if the logger has any problem. + Status GetStatus() { + return status_; + } + + private: + Status InitLogger() { + status_ = env_->NewLogger(log_fname_, &logger_); + if (!status_.ok()) { + logger_ = NULL; + } + if (logger_->GetLogFileSize() == + Logger::DO_NOT_SUPPORT_GET_LOG_FILE_SIZE) { + status_ = Status::NotSupported( + "The underlying logger doesn't support GetLogFileSize()"); + } + return status_; + } + +}; // class AutoSplitLogger + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_UTIL_SPLIT_LOGGER_LOGGER_H_ diff --git a/util/options.cc b/util/options.cc index b48b068f3..6a5c36e25 100644 --- a/util/options.cc +++ b/util/options.cc @@ -25,6 +25,7 @@ Options::Options() block_size(4096), block_restart_interval(16), compression(kSnappyCompression), + compression_per_level(NULL), filter_policy(NULL), num_levels(7), level0_file_num_compaction_trigger(4), @@ -44,7 +45,9 @@ Options::Options() db_log_dir(""), disable_seek_compaction(false), delete_obsolete_files_period_micros(0), - max_background_compactions(1) { + max_background_compactions(1), + max_log_file_size(0), + rate_limit(0.0) { } void @@ -70,7 +73,21 @@ Options::Dump( Log(log," Options.num_levels: %d", num_levels); Log(log," Options.disableDataSync: %d", disableDataSync); Log(log," Options.use_fsync: %d", use_fsync); - Log(log," Options.db_stats_log_interval: %d", + if (compression_per_level != NULL) { + for (unsigned int i = 0; i < num_levels; i++){ + Log(log," Options.compression[%d]: %d", + i, compression_per_level[i]); + } + } else { + Log(log," Options.compression: %d", compression); + } + Log(log," Options.filter_policy: %s", + filter_policy == NULL ? "NULL" : filter_policy->Name()); + Log(log," Options.num_levels: %d", num_levels); + Log(log," Options.disableDataSync: %d", disableDataSync); + Log(log," Options.use_fsync: %d", use_fsync); + Log(log," Options.max_log_file_size: %d", max_log_file_size); + Log(log," Options.db_stats_log_interval: %d", db_stats_log_interval); Log(log," Options.level0_file_num_compaction_trigger: %d", level0_file_num_compaction_trigger); @@ -100,6 +117,8 @@ Options::Dump( delete_obsolete_files_period_micros); Log(log," Options.max_background_compactions: %d", max_background_compactions); + Log(log," Options.rate_limit: %.2f", + rate_limit); } // Options::Dump diff --git a/util/posix_logger.h b/util/posix_logger.h index 9741b1afa..513528314 100644 --- a/util/posix_logger.h +++ b/util/posix_logger.h @@ -20,8 +20,11 @@ class PosixLogger : public Logger { private: FILE* file_; uint64_t (*gettid_)(); // Return the thread id for the current thread + + size_t log_size_; public: - PosixLogger(FILE* f, uint64_t (*gettid)()) : file_(f), gettid_(gettid) { } + PosixLogger(FILE* f, uint64_t (*gettid)()) : + file_(f), gettid_(gettid), log_size_(0) { } virtual ~PosixLogger() { fclose(file_); } @@ -85,12 +88,17 @@ class PosixLogger : public Logger { assert(p <= limit); fwrite(base, 1, p - base, file_); fflush(file_); + log_size_ += (p - base); + if (base != buffer) { delete[] base; } break; } } + size_t GetLogFileSize() const { + return log_size_; + } }; } // namespace leveldb