diff --git a/AUTHORS b/AUTHORS old mode 100644 new mode 100755 diff --git a/LICENSE b/LICENSE old mode 100644 new mode 100755 diff --git a/Makefile b/Makefile old mode 100644 new mode 100755 diff --git a/README b/README old mode 100644 new mode 100755 diff --git a/TODO b/TODO old mode 100644 new mode 100755 index e17dfdb78..2f848b863 --- a/TODO +++ b/TODO @@ -1,11 +1,3 @@ -Before adding to chrome ------------------------ -- multi-threaded test/benchmark -- Allow missing crc32c in Table format? - -Maybe afterwards ----------------- - ss - Stats diff --git a/db/builder.cc b/db/builder.cc old mode 100644 new mode 100755 diff --git a/db/builder.h b/db/builder.h old mode 100644 new mode 100755 diff --git a/db/corruption_test.cc b/db/corruption_test.cc old mode 100644 new mode 100755 index de9408c3a..63d8d8bca --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -8,6 +8,7 @@ #include #include #include +#include "leveldb/cache.h" #include "leveldb/env.h" #include "leveldb/table.h" #include "leveldb/write_batch.h" @@ -28,10 +29,12 @@ class CorruptionTest { test::ErrorEnv env_; Random rnd_; std::string dbname_; + Cache* tiny_cache_; Options options_; DB* db_; CorruptionTest() : rnd_(test::RandomSeed()) { + tiny_cache_ = NewLRUCache(100); options_.env = &env_; dbname_ = test::TmpDir() + "/db_test"; DestroyDB(dbname_, options_); @@ -45,6 +48,7 @@ class CorruptionTest { ~CorruptionTest() { delete db_; DestroyDB(dbname_, Options()); + delete tiny_cache_; } Status TryReopen(Options* options = NULL) { @@ -52,6 +56,7 @@ class CorruptionTest { db_ = NULL; Options opt = (options ? *options : options_); opt.env = &env_; + opt.block_cache = tiny_cache_; return DB::Open(opt, dbname_, &db_); } @@ -160,12 +165,15 @@ class CorruptionTest { ASSERT_TRUE(s.ok()) << s.ToString(); } - uint64_t Property(const std::string& name) { - uint64_t result; - if (!db_->GetProperty(name, &result)) { - result = ~static_cast(0); + int Property(const std::string& name) { + std::string property; + int result; + if (db_->GetProperty(name, &property) && + sscanf(property.c_str(), "%d", &result) == 1) { + return result; + } else { + return -1; } - return result; } // Return the ith key @@ -235,7 +243,7 @@ TEST(CorruptionTest, TableFileIndexData) { dbi->TEST_CompactRange(0, "", "~"); dbi->TEST_CompactRange(1, "", "~"); - Corrupt(kTableFile, -1000, 500); + Corrupt(kTableFile, -2000, 500); Reopen(); Check(5000, 9999); } @@ -327,6 +335,7 @@ TEST(CorruptionTest, CompactionInputError) { TEST(CorruptionTest, CompactionInputErrorParanoid) { Options options; options.paranoid_checks = true; + options.write_buffer_size = 1048576; Reopen(&options); Build(10); diff --git a/db/db_bench.cc b/db/db_bench.cc old mode 100644 new mode 100755 index 411493cc9..849ebfa66 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -31,11 +31,8 @@ // sha1 -- repeated SHA1 computation over 4K of data // Meta operations: // compact -- Compact the entire DB +// stats -- Print DB stats // heapprofile -- Dump a heap profile (if supported by this port) -// sync -- switch to synchronous writes (not the default) -// nosync -- switch to asynchronous writes (the default) -// tenth -- divide N by 10 (i.e., following benchmarks are smaller) -// normal -- reset N back to its normal value (1000000) static const char* FLAGS_benchmarks = "fillseq," "fillsync," @@ -51,7 +48,9 @@ static const char* FLAGS_benchmarks = "readreverse," "fill100K," "crc32c," - "sha1" + "sha1," + "snappycomp," + "snappyuncomp," ; // Number of key/values to place in database @@ -68,7 +67,12 @@ static double FLAGS_compression_ratio = 0.5; static bool FLAGS_histogram = false; // Number of bytes to buffer in memtable before compacting -static int FLAGS_write_buffer_size = 1 << 20; +// (initialized to default value by "main") +static int FLAGS_write_buffer_size = 0; + +// Number of bytes to use as a cache of uncompressed data. +// Negative means use default settings. +static int FLAGS_cache_size = -1; namespace leveldb { @@ -129,6 +133,7 @@ class Benchmark { double last_op_finish_; int64_t bytes_; std::string message_; + std::string post_message_; Histogram hist_; RandomGenerator gen_; Random rand_; @@ -146,7 +151,8 @@ class Benchmark { static_cast(FLAGS_value_size * FLAGS_compression_ratio + 0.5)); fprintf(stdout, "Entries: %d\n", num_); fprintf(stdout, "RawSize: %.1f MB (estimated)\n", - (((kKeySize + FLAGS_value_size) * num_) / 1048576.0)); + ((static_cast(kKeySize + FLAGS_value_size) * num_) + / 1048576.0)); fprintf(stdout, "FileSize: %.1f MB (estimated)\n", (((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_) / 1048576.0)); @@ -164,6 +170,15 @@ class Benchmark { fprintf(stdout, "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n"); #endif + + // See if snappy is working by attempting to compress a compressible string + const char text[] = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy"; + std::string compressed; + if (!port::Snappy_Compress(text, sizeof(text), &compressed)) { + fprintf(stdout, "WARNING: Snappy compression is not enabled\n"); + } else if (compressed.size() >= sizeof(text)) { + fprintf(stdout, "WARNING: Snappy compression is not effective\n"); + } } void PrintEnvironment() { @@ -225,15 +240,13 @@ class Benchmark { done_++; if (done_ >= next_report_) { - if (next_report_ < 1000) { - next_report_ += 100; - } else if (next_report_ < 10000) { - next_report_ += 1000; - } else if (next_report_ < 100000) { - next_report_ += 10000; - } else { - next_report_ += 100000; - } + if (next_report_ < 1000) next_report_ += 100; + else if (next_report_ < 5000) next_report_ += 500; + else if (next_report_ < 10000) next_report_ += 1000; + else if (next_report_ < 50000) next_report_ += 5000; + else if (next_report_ < 100000) next_report_ += 10000; + else if (next_report_ < 500000) next_report_ += 50000; + else next_report_ += 100000; fprintf(stderr, "... finished %d ops%30s\r", done_, ""); fflush(stderr); } @@ -248,7 +261,7 @@ class Benchmark { if (bytes_ > 0) { char rate[100]; - snprintf(rate, sizeof(rate), "%5.1f MB/s", + snprintf(rate, sizeof(rate), "%6.1f MB/s", (bytes_ / 1048576.0) / (finish - start_)); if (!message_.empty()) { message_ = std::string(rate) + " " + message_; @@ -266,6 +279,11 @@ class Benchmark { fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str()); } fflush(stdout); + + if (!post_message_.empty()) { + fprintf(stdout, "\n%s\n", post_message_.c_str()); + post_message_.clear(); + } } public: @@ -278,12 +296,13 @@ class Benchmark { EXISTING }; - Benchmark() : cache_(NewLRUCache(200<<20)), - db_(NULL), - num_(FLAGS_num), - heap_counter_(0), - bytes_(0), - rand_(301) { + Benchmark() + : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : NULL), + db_(NULL), + num_(FLAGS_num), + heap_counter_(0), + bytes_(0), + rand_(301) { std::vector files; Env::Default()->GetChildren("/tmp/dbbench", &files); for (int i = 0; i < files.size(); i++) { @@ -318,36 +337,54 @@ class Benchmark { Start(); WriteOptions write_options; - write_options.sync = false; + bool known = true; if (name == Slice("fillseq")) { - Write(write_options, SEQUENTIAL, FRESH, num_, FLAGS_value_size); + Write(write_options, SEQUENTIAL, FRESH, num_, FLAGS_value_size, 1); + } else if (name == Slice("fillbatch")) { + Write(write_options, SEQUENTIAL, FRESH, num_, FLAGS_value_size, 1000); } else if (name == Slice("fillrandom")) { - Write(write_options, RANDOM, FRESH, num_, FLAGS_value_size); + Write(write_options, RANDOM, FRESH, num_, FLAGS_value_size, 1); } else if (name == Slice("overwrite")) { - Write(write_options, RANDOM, EXISTING, num_, FLAGS_value_size); + Write(write_options, RANDOM, EXISTING, num_, FLAGS_value_size, 1); } else if (name == Slice("fillsync")) { write_options.sync = true; - Write(write_options, RANDOM, FRESH, num_ / 100, FLAGS_value_size); + Write(write_options, RANDOM, FRESH, num_ / 100, FLAGS_value_size, 1); } else if (name == Slice("fill100K")) { - Write(write_options, RANDOM, FRESH, num_ / 1000, 100 * 1000); + Write(write_options, RANDOM, FRESH, num_ / 1000, 100 * 1000, 1); } else if (name == Slice("readseq")) { ReadSequential(); } else if (name == Slice("readreverse")) { ReadReverse(); } else if (name == Slice("readrandom")) { ReadRandom(); + } else if (name == Slice("readrandomsmall")) { + int n = num_; + num_ /= 1000; + ReadRandom(); + num_ = n; } else if (name == Slice("compact")) { Compact(); } else if (name == Slice("crc32c")) { Crc32c(4096, "(4K per op)"); } else if (name == Slice("sha1")) { SHA1(4096, "(4K per op)"); + } else if (name == Slice("snappycomp")) { + SnappyCompress(); + } else if (name == Slice("snappyuncomp")) { + SnappyUncompress(); } else if (name == Slice("heapprofile")) { HeapProfile(); + } else if (name == Slice("stats")) { + PrintStats(); } else { - fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str()); + known = false; + if (name != Slice()) { // No error message for empty name + fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str()); + } + } + if (known) { + Stop(name); } - Stop(name); } } @@ -387,11 +424,54 @@ class Benchmark { message_ = label; } + void SnappyCompress() { + Slice input = gen_.Generate(Options().block_size); + int64_t bytes = 0; + int64_t produced = 0; + bool ok = true; + std::string compressed; + while (ok && bytes < 1024 * 1048576) { // Compress 1G + ok = port::Snappy_Compress(input.data(), input.size(), &compressed); + produced += compressed.size(); + bytes += input.size(); + FinishedSingleOp(); + } + + if (!ok) { + message_ = "(snappy failure)"; + } else { + char buf[100]; + snprintf(buf, sizeof(buf), "(output: %.1f%%)", + (produced * 100.0) / bytes); + message_ = buf; + bytes_ = bytes; + } + } + + void SnappyUncompress() { + Slice input = gen_.Generate(Options().block_size); + std::string compressed; + bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed); + int64_t bytes = 0; + std::string uncompressed; + while (ok && bytes < 1024 * 1048576) { // Compress 1G + ok = port::Snappy_Uncompress(compressed.data(), compressed.size(), + &uncompressed); + bytes += uncompressed.size(); + FinishedSingleOp(); + } + + if (!ok) { + message_ = "(snappy failure)"; + } else { + bytes_ = bytes; + } + } + void Open() { assert(db_ == NULL); Options options; options.create_if_missing = true; - options.max_open_files = 10000; options.block_cache = cache_; options.write_buffer_size = FLAGS_write_buffer_size; Status s = DB::Open(options, "/tmp/dbbench", &db_); @@ -402,7 +482,7 @@ class Benchmark { } void Write(const WriteOptions& options, Order order, DBState state, - int num_entries, int value_size) { + int num_entries, int value_size, int entries_per_batch) { if (state == FRESH) { delete db_; db_ = NULL; @@ -420,19 +500,21 @@ class Benchmark { WriteBatch batch; Status s; std::string val; - for (int i = 0; i < num_entries; i++) { - const int k = (order == SEQUENTIAL) ? i : (rand_.Next() % FLAGS_num); - char key[100]; - snprintf(key, sizeof(key), "%016d", k); + for (int i = 0; i < num_entries; i += entries_per_batch) { batch.Clear(); - batch.Put(key, gen_.Generate(value_size)); + for (int j = 0; j < entries_per_batch; j++) { + const int k = (order == SEQUENTIAL) ? i+j : (rand_.Next() % FLAGS_num); + char key[100]; + snprintf(key, sizeof(key), "%016d", k); + batch.Put(key, gen_.Generate(value_size)); + bytes_ += value_size + strlen(key); + FinishedSingleOp(); + } s = db_->Write(options, &batch); - bytes_ += value_size + strlen(key); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); } - FinishedSingleOp(); } } @@ -475,10 +557,10 @@ class Benchmark { dbi->TEST_CompactMemTable(); int max_level_with_files = 1; for (int level = 1; level < config::kNumLevels; level++) { - uint64_t v; + std::string property; char name[100]; snprintf(name, sizeof(name), "leveldb.num-files-at-level%d", level); - if (db_->GetProperty(name, &v) && v > 0) { + if (db_->GetProperty(name, &property) && atoi(property.c_str()) > 0) { max_level_with_files = level; } } @@ -487,6 +569,15 @@ class Benchmark { } } + void PrintStats() { + std::string stats; + if (!db_->GetProperty("leveldb.stats", &stats)) { + message_ = "(failed)"; + } else { + post_message_ = stats; + } + } + static void WriteToFile(void* arg, const char* buf, int n) { reinterpret_cast(arg)->Append(Slice(buf, n)); } @@ -512,6 +603,7 @@ class Benchmark { } int main(int argc, char** argv) { + FLAGS_write_buffer_size = leveldb::Options().write_buffer_size; for (int i = 1; i < argc; i++) { double d; int n; @@ -529,7 +621,9 @@ int main(int argc, char** argv) { FLAGS_value_size = n; } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) { FLAGS_write_buffer_size = n; - } else { + } else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) { + FLAGS_cache_size = n; + } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); exit(1); } diff --git a/db/db_impl.cc b/db/db_impl.cc old mode 100644 new mode 100755 index cf5471b28..d01223682 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -104,6 +104,9 @@ Options SanitizeOptions(const std::string& dbname, result.info_log = new NullWritableFile; } } + if (result.block_cache == NULL) { + result.block_cache = NewLRUCache(8 << 20); + } return result; } @@ -112,18 +115,20 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) internal_comparator_(options.comparator), options_(SanitizeOptions(dbname, &internal_comparator_, options)), owns_info_log_(options_.info_log != options.info_log), + owns_cache_(options_.block_cache != options.block_cache), dbname_(dbname), db_lock_(NULL), shutting_down_(NULL), bg_cv_(&mutex_), compacting_cv_(&mutex_), - last_sequence_(0), mem_(new MemTable(internal_comparator_)), + imm_(NULL), logfile_(NULL), log_(NULL), - log_number_(0), bg_compaction_scheduled_(false), compacting_(false) { + has_imm_.Release_Store(NULL); + // Reserve ten files or so for other uses and give the rest to TableCache. const int table_cache_size = options.max_open_files - 10; table_cache_ = new TableCache(dbname_, &options_, table_cache_size); @@ -149,6 +154,7 @@ DBImpl::~DBImpl() { delete versions_; delete mem_; + delete imm_; delete log_; delete logfile_; delete table_cache_; @@ -156,15 +162,15 @@ DBImpl::~DBImpl() { if (owns_info_log_) { delete options_.info_log; } + if (owns_cache_) { + delete options_.block_cache; + } } Status DBImpl::NewDB() { - assert(log_number_ == 0); - assert(last_sequence_ == 0); - VersionEdit new_db; new_db.SetComparatorName(user_comparator()->Name()); - new_db.SetLogNumber(log_number_); + new_db.SetLogNumber(0); new_db.SetNextFile(2); new_db.SetLastSequence(0); @@ -193,15 +199,6 @@ Status DBImpl::NewDB() { return s; } -Status DBImpl::Install(VersionEdit* edit, - uint64_t new_log_number, - MemTable* cleanup_mem) { - mutex_.AssertHeld(); - edit->SetLogNumber(new_log_number); - edit->SetLastSequence(last_sequence_); - return versions_->LogAndApply(edit, cleanup_mem); -} - void DBImpl::MaybeIgnoreError(Status* s) const { if (s->ok() || options_.paranoid_checks) { // No change needed @@ -216,7 +213,7 @@ void DBImpl::DeleteObsoleteFiles() { std::set live = pending_outputs_; versions_->AddLiveFiles(&live); - versions_->CleanupLargeValueRefs(live, log_number_); + versions_->CleanupLargeValueRefs(live); std::vector filenames; env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose @@ -228,7 +225,8 @@ void DBImpl::DeleteObsoleteFiles() { bool keep = true; switch (type) { case kLogFile: - keep = (number == log_number_); + keep = ((number == versions_->LogNumber()) || + (number == versions_->PrevLogNumber())); break; case kDescriptorFile: // Keep my manifest file, and any newer incarnations' @@ -296,16 +294,20 @@ Status DBImpl::Recover(VersionEdit* edit) { } } - s = versions_->Recover(&log_number_, &last_sequence_); + s = versions_->Recover(); if (s.ok()) { - // Recover from the log file named in the descriptor + // Recover from the log files named in the descriptor SequenceNumber max_sequence(0); - if (log_number_ != 0) { // log_number_ == 0 indicates initial empty state - s = RecoverLogFile(log_number_, edit, &max_sequence); + if (versions_->PrevLogNumber() != 0) { // log#==0 means no prev log + s = RecoverLogFile(versions_->PrevLogNumber(), edit, &max_sequence); + } + if (s.ok() && versions_->LogNumber() != 0) { // log#==0 for initial state + s = RecoverLogFile(versions_->LogNumber(), edit, &max_sequence); } if (s.ok()) { - last_sequence_ = - last_sequence_ > max_sequence ? last_sequence_ : max_sequence; + if (versions_->LastSequence() < max_sequence) { + versions_->SetLastSequence(max_sequence); + } } } @@ -407,56 +409,58 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit) { mutex_.AssertHeld(); + const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; meta.number = versions_->NewFileNumber(); pending_outputs_.insert(meta.number); Iterator* iter = mem->NewIterator(); Log(env_, options_.info_log, "Level-0 table #%llu: started", (unsigned long long) meta.number); - Status s = BuildTable(dbname_, env_, options_, table_cache_, - iter, &meta, edit); + + Status s; + { + mutex_.Unlock(); + s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta, edit); + mutex_.Lock(); + } + Log(env_, options_.info_log, "Level-0 table #%llu: %lld bytes %s", (unsigned long long) meta.number, (unsigned long long) meta.file_size, s.ToString().c_str()); delete iter; pending_outputs_.erase(meta.number); + + CompactionStats stats; + stats.micros = env_->NowMicros() - start_micros; + stats.bytes_written = meta.file_size; + stats_[0].Add(stats); return s; } Status DBImpl::CompactMemTable() { mutex_.AssertHeld(); - - WritableFile* lfile = NULL; - uint64_t new_log_number = versions_->NewFileNumber(); - - VersionEdit edit; + assert(imm_ != NULL); + assert(compacting_); // Save the contents of the memtable as a new Table - Status s = WriteLevel0Table(mem_, &edit); - if (s.ok()) { - s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); - } + VersionEdit edit; + Status s = WriteLevel0Table(imm_, &edit); - // Save a new descriptor with the new table and log number. + // Replace immutable memtable with the generated Table if (s.ok()) { - s = Install(&edit, new_log_number, mem_); + edit.SetPrevLogNumber(0); + s = versions_->LogAndApply(&edit, imm_); } if (s.ok()) { // Commit to the new state - mem_ = new MemTable(internal_comparator_); - delete log_; - delete logfile_; - logfile_ = lfile; - log_ = new log::Writer(lfile); - log_number_ = new_log_number; + imm_ = NULL; + has_imm_.Release_Store(NULL); DeleteObsoleteFiles(); - MaybeScheduleCompaction(); - } else { - delete lfile; - env_->DeleteFile(LogFileName(dbname_, new_log_number)); } + + compacting_cv_.SignalAll(); // Wake up waiter even if there was an error return s; } @@ -485,7 +489,17 @@ void DBImpl::TEST_CompactRange( Status DBImpl::TEST_CompactMemTable() { MutexLock l(&mutex_); - return CompactMemTable(); + Status s = MakeRoomForWrite(true /* force compaction */); + if (s.ok()) { + // Wait until the compaction completes + while (imm_ != NULL && bg_error_.ok()) { + compacting_cv_.Wait(); + } + if (imm_ != NULL) { + s = bg_error_; + } + } + return s; } void DBImpl::MaybeScheduleCompaction() { @@ -496,7 +510,7 @@ void DBImpl::MaybeScheduleCompaction() { // Some other thread is running a compaction. Do not conflict with it. } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions - } else if (!versions_->NeedsCompaction()) { + } else if (imm_ == NULL && !versions_->NeedsCompaction()) { // No work to be done } else { bg_compaction_scheduled_ = true; @@ -525,6 +539,16 @@ void DBImpl::BackgroundCall() { void DBImpl::BackgroundCompaction() { mutex_.AssertHeld(); + assert(!compacting_); + + if (imm_ != NULL) { + compacting_ = true; + CompactMemTable(); + compacting_ = false; + compacting_cv_.SignalAll(); + return; + } + Compaction* c = versions_->PickCompaction(); if (c == NULL) { // Nothing to do @@ -539,7 +563,7 @@ void DBImpl::BackgroundCompaction() { c->edit()->DeleteFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest); - status = Install(c->edit(), log_number_, NULL); + status = versions_->LogAndApply(c->edit(), NULL); Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s\n", static_cast(f->number), c->level() + 1, @@ -680,7 +704,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { } compact->outputs.clear(); - Status s = Install(compact->compaction->edit(), log_number_, NULL); + Status s = versions_->LogAndApply(compact->compaction->edit(), NULL); if (s.ok()) { compact->compaction->ReleaseInputs(); DeleteObsoleteFiles(); @@ -694,6 +718,9 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { } Status DBImpl::DoCompactionWork(CompactionState* compact) { + const uint64_t start_micros = env_->NowMicros(); + int64_t imm_micros = 0; // Micros spent doing imm_ compactions + Log(env_, options_.info_log, "Compacting %d@%d + %d@%d files", compact->compaction->num_input_files(0), compact->compaction->level(), @@ -704,7 +731,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { assert(compact->builder == NULL); assert(compact->outfile == NULL); if (snapshots_.empty()) { - compact->smallest_snapshot = last_sequence_; + compact->smallest_snapshot = versions_->LastSequence(); } else { compact->smallest_snapshot = snapshots_.oldest()->number_; } @@ -721,6 +748,18 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { bool has_current_user_key = false; SequenceNumber last_sequence_for_key = kMaxSequenceNumber; for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { + // Prioritize immutable compaction work + if (has_imm_.NoBarrier_Load() != NULL) { + const uint64_t imm_start = env_->NowMicros(); + mutex_.Lock(); + if (imm_ != NULL) { + CompactMemTable(); + compacting_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary + } + mutex_.Unlock(); + imm_micros += (env_->NowMicros() - imm_start); + } + Slice key = input->key(); InternalKey tmp_internal_key; tmp_internal_key.DecodeFrom(key); @@ -835,7 +874,19 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { delete input; input = NULL; + CompactionStats stats; + stats.micros = env_->NowMicros() - start_micros - imm_micros; + for (int which = 0; which < 2; which++) { + for (int i = 0; i < compact->compaction->num_input_files(which); i++) { + stats.bytes_read += compact->compaction->input(which, i)->file_size; + } + } + for (int i = 0; i < compact->outputs.size(); i++) { + stats.bytes_written += compact->outputs[i].file_size; + } + mutex_.Lock(); + stats_[compact->compaction->level() + 1].Add(stats); if (status.ok()) { status = InstallCompactionResults(compact); @@ -848,11 +899,14 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, SequenceNumber* latest_snapshot) { mutex_.Lock(); - *latest_snapshot = last_sequence_; + *latest_snapshot = versions_->LastSequence(); // Collect together all needed child iterators std::vector list; list.push_back(mem_->NewIterator()); + if (imm_ != NULL) { + list.push_back(imm_->NewIterator()); + } versions_->current()->AddIterators(options, &list); Iterator* internal_iter = NewMergingIterator(&internal_comparator_, &list[0], list.size()); @@ -912,7 +966,7 @@ void DBImpl::Unref(void* arg1, void* arg2) { const Snapshot* DBImpl::GetSnapshot() { MutexLock l(&mutex_); - return snapshots_.New(last_sequence_); + return snapshots_.New(versions_->LastSequence()); } void DBImpl::ReleaseSnapshot(const Snapshot* s) { @@ -935,17 +989,16 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { WriteBatch* final = NULL; { MutexLock l(&mutex_); - if (!bg_error_.ok()) { - status = bg_error_; - } else if (mem_->ApproximateMemoryUsage() > options_.write_buffer_size) { - status = CompactMemTable(); - } + status = MakeRoomForWrite(false); // May temporarily release lock and wait + + uint64_t last_sequence = versions_->LastSequence(); if (status.ok()) { - status = HandleLargeValues(last_sequence_ + 1, updates, &final); + status = HandleLargeValues(last_sequence + 1, updates, &final); } if (status.ok()) { - WriteBatchInternal::SetSequence(final, last_sequence_ + 1); - last_sequence_ += WriteBatchInternal::Count(final); + WriteBatchInternal::SetSequence(final, last_sequence + 1); + last_sequence += WriteBatchInternal::Count(final); + versions_->SetLastSequence(last_sequence); // Add to log and apply to memtable status = log_->AddRecord(WriteBatchInternal::Contents(final)); @@ -959,7 +1012,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { if (options.post_write_snapshot != NULL) { *options.post_write_snapshot = - status.ok() ? snapshots_.New(last_sequence_) : NULL; + status.ok() ? snapshots_.New(last_sequence) : NULL; } } if (final != updates) { @@ -969,6 +1022,54 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { return status; } +Status DBImpl::MakeRoomForWrite(bool force) { + mutex_.AssertHeld(); + Status s; + while (true) { + if (!bg_error_.ok()) { + // Yield previous error + s = bg_error_; + break; + } else if (!force && + (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { + // There is room in current memtable + break; + } else if (imm_ != NULL) { + // We have filled up the current memtable, but the previous + // one is still being compacted, so we wait. + compacting_cv_.Wait(); + } else { + // Attempt to switch to a new memtable and trigger compaction of old + assert(versions_->PrevLogNumber() == 0); + uint64_t new_log_number = versions_->NewFileNumber(); + WritableFile* lfile = NULL; + s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); + if (!s.ok()) { + break; + } + VersionEdit edit; + edit.SetPrevLogNumber(versions_->LogNumber()); + edit.SetLogNumber(new_log_number); + s = versions_->LogAndApply(&edit, NULL); + if (!s.ok()) { + delete lfile; + env_->DeleteFile(LogFileName(dbname_, new_log_number)); + break; + } + delete log_; + delete logfile_; + logfile_ = lfile; + log_ = new log::Writer(lfile); + imm_ = mem_; + has_imm_.Release_Store(imm_); + mem_ = new MemTable(internal_comparator_); + force = false; // Do not force another compaction if have room + MaybeScheduleCompaction(); + } + } + return s; +} + bool DBImpl::HasLargeValues(const WriteBatch& batch) const { if (WriteBatchInternal::ByteSize(&batch) >= options_.large_value_threshold) { for (WriteBatchInternal::Iterator it(batch); !it.Done(); it.Next()) { @@ -1033,9 +1134,10 @@ Status DBImpl::HandleLargeValues(SequenceNumber assigned_seq, MaybeCompressLargeValue( it.value(), &file_bytes, &scratch, &large_ref); InternalKey ikey(it.key(), seq, kTypeLargeValueRef); - if (versions_->RegisterLargeValueRef(large_ref, log_number_,ikey)) { + if (versions_->RegisterLargeValueRef( + large_ref, versions_->LogNumber(), ikey)) { // TODO(opt): avoid holding the lock here (but be careful about - // another thread doing a Write and changing log_number_ or + // another thread doing a Write and switching logs or // having us get a different "assigned_seq" value). uint64_t tmp_number = versions_->NewFileNumber(); @@ -1086,7 +1188,9 @@ Status DBImpl::HandleLargeValues(SequenceNumber assigned_seq, return Status::OK(); } -bool DBImpl::GetProperty(const Slice& property, uint64_t* value) { +bool DBImpl::GetProperty(const Slice& property, std::string* value) { + value->clear(); + MutexLock l(&mutex_); Slice in = property; Slice prefix("leveldb."); @@ -1100,10 +1204,37 @@ bool DBImpl::GetProperty(const Slice& property, uint64_t* value) { if (!ok || level < 0 || level >= config::kNumLevels) { return false; } else { - *value = versions_->NumLevelFiles(level); + char buf[100]; + snprintf(buf, sizeof(buf), "%d", versions_->NumLevelFiles(level)); + *value = buf; return true; } + } else if (in == "stats") { + char buf[200]; + snprintf(buf, sizeof(buf), + " Compactions\n" + "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n" + "--------------------------------------------------\n" + ); + value->append(buf); + for (int level = 0; level < config::kNumLevels; level++) { + int files = versions_->NumLevelFiles(level); + if (stats_[level].micros > 0 || files > 0) { + snprintf( + buf, sizeof(buf), + "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", + level, + files, + versions_->NumLevelBytes(level) / 1048576.0, + stats_[level].micros / 1e6, + stats_[level].bytes_read / 1048576.0, + stats_[level].bytes_written / 1048576.0); + value->append(buf); + } + } + return true; } + return false; } @@ -1158,14 +1289,15 @@ Status DB::Open(const Options& options, const std::string& dbname, VersionEdit edit; Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists if (s.ok()) { - impl->log_number_ = impl->versions_->NewFileNumber(); + uint64_t new_log_number = impl->versions_->NewFileNumber(); WritableFile* lfile; - s = options.env->NewWritableFile(LogFileName(dbname, impl->log_number_), + s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), &lfile); if (s.ok()) { + edit.SetLogNumber(new_log_number); impl->logfile_ = lfile; impl->log_ = new log::Writer(lfile); - s = impl->Install(&edit, impl->log_number_, NULL); + s = impl->versions_->LogAndApply(&edit, NULL); } if (s.ok()) { impl->DeleteObsoleteFiles(); diff --git a/db/db_impl.h b/db/db_impl.h old mode 100644 new mode 100755 index 49ac37b9d..1f685f011 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -36,7 +36,7 @@ class DBImpl : public DB { virtual Iterator* NewIterator(const ReadOptions&); virtual const Snapshot* GetSnapshot(); virtual void ReleaseSnapshot(const Snapshot* snapshot); - virtual bool GetProperty(const Slice& property, uint64_t* value); + virtual bool GetProperty(const Slice& property, std::string* value); virtual void GetApproximateSizes(const Range* range, int n, uint64_t* sizes); // Extra methods (for testing) that are not in the public DB interface @@ -72,14 +72,6 @@ class DBImpl : public DB { // be made to the descriptor are added to *edit. Status Recover(VersionEdit* edit); - // Apply the specified updates and save the resulting descriptor to - // persistent storage. If cleanup_mem is non-NULL, arrange to - // delete it when all existing snapshots have gone away iff Install() - // returns OK. - Status Install(VersionEdit* edit, - uint64_t new_log_number, - MemTable* cleanup_mem); - void MaybeIgnoreError(Status* s) const; // Delete any unneeded files and stale in-memory entries. @@ -99,6 +91,7 @@ class DBImpl : public DB { Status WriteLevel0Table(MemTable* mem, VersionEdit* edit); + Status MakeRoomForWrite(bool force /* compact even if there is room? */); bool HasLargeValues(const WriteBatch& batch) const; // Process data in "*updates" and return a status. "assigned_seq" @@ -141,6 +134,7 @@ class DBImpl : public DB { const InternalKeyComparator internal_comparator_; const Options options_; // options_.comparator == &internal_comparator_ bool owns_info_log_; + bool owns_cache_; const std::string dbname_; // table_cache_ provides its own synchronization @@ -152,13 +146,13 @@ class DBImpl : public DB { // State below is protected by mutex_ port::Mutex mutex_; port::AtomicPointer shutting_down_; - port::CondVar bg_cv_; // Signalled when !bg_compaction_scheduled_ + port::CondVar bg_cv_; // Signalled when !bg_compaction_scheduled_ port::CondVar compacting_cv_; // Signalled when !compacting_ - SequenceNumber last_sequence_; MemTable* mem_; + MemTable* imm_; // Memtable being compacted + port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_ WritableFile* logfile_; log::Writer* log_; - uint64_t log_number_; SnapshotList snapshots_; // Set of table files to protect from deletion because they are @@ -176,6 +170,23 @@ class DBImpl : public DB { // Have we encountered a background error in paranoid mode? Status bg_error_; + // Per level compaction stats. stats_[level] stores the stats for + // compactions that produced data for the specified "level". + struct CompactionStats { + int64_t micros; + int64_t bytes_read; + int64_t bytes_written; + + CompactionStats() : micros(0), bytes_read(0), bytes_written(0) { } + + void Add(const CompactionStats& c) { + this->micros += c.micros; + this->bytes_read += c.bytes_read; + this->bytes_written += c.bytes_written; + } + }; + CompactionStats stats_[config::kNumLevels]; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); diff --git a/db/db_iter.cc b/db/db_iter.cc old mode 100644 new mode 100755 diff --git a/db/db_iter.h b/db/db_iter.h old mode 100644 new mode 100755 diff --git a/db/db_test.cc b/db/db_test.cc old mode 100644 new mode 100755 index f68e7594f..04de3313e --- a/db/db_test.cc +++ b/db/db_test.cc @@ -72,19 +72,11 @@ class DBTest { } Status Put(const std::string& k, const std::string& v) { - WriteOptions options; - options.sync = false; - WriteBatch batch; - batch.Put(k, v); - return db_->Write(options, &batch); + return db_->Put(WriteOptions(), k, v); } Status Delete(const std::string& k) { - WriteOptions options; - options.sync = false; - WriteBatch batch; - batch.Delete(k); - return db_->Write(options, &batch); + return db_->Delete(WriteOptions(), k); } std::string Get(const std::string& k, const Snapshot* snapshot = NULL) { @@ -147,11 +139,11 @@ class DBTest { } int NumTableFilesAtLevel(int level) { - uint64_t val; + std::string property; ASSERT_TRUE( db_->GetProperty("leveldb.num-files-at-level" + NumberToString(level), - &val)); - return val; + &property)); + return atoi(property.c_str()); } uint64_t Size(const Slice& start, const Slice& limit) { @@ -185,10 +177,7 @@ class DBTest { dbfull()->TEST_CompactMemTable(); int max_level_with_files = 1; for (int level = 1; level < config::kNumLevels; level++) { - uint64_t v; - char name[100]; - snprintf(name, sizeof(name), "leveldb.num-files-at-level%d", level); - if (dbfull()->GetProperty(name, &v) && v > 0) { + if (NumTableFilesAtLevel(level) > 0) { max_level_with_files = level; } } @@ -459,7 +448,7 @@ TEST(DBTest, MinorCompactionsHappen) { options.write_buffer_size = 10000; Reopen(&options); - const int N = 100; + const int N = 500; int starting_num_tables = NumTableFilesAtLevel(0); for (int i = 0; i < N; i++) { @@ -1047,7 +1036,7 @@ class ModelDB: public DB { return Status::OK(); } - virtual bool GetProperty(const Slice& property, uint64_t* value) { + virtual bool GetProperty(const Slice& property, std::string* value) { return false; } virtual void GetApproximateSizes(const Range* r, int n, uint64_t* sizes) { diff --git a/db/dbformat.cc b/db/dbformat.cc old mode 100644 new mode 100755 diff --git a/db/dbformat.h b/db/dbformat.h old mode 100644 new mode 100755 index 6f34cd16e..5f117f95f --- a/db/dbformat.h +++ b/db/dbformat.h @@ -15,6 +15,12 @@ namespace leveldb { +// Grouping of constants. We may want to make some of these +// parameters set via options. +namespace config { +static const int kNumLevels = 7; +} + class InternalKey; // Value types encoded as the last component of internal keys. diff --git a/db/dbformat_test.cc b/db/dbformat_test.cc old mode 100644 new mode 100755 diff --git a/db/filename.cc b/db/filename.cc old mode 100644 new mode 100755 diff --git a/db/filename.h b/db/filename.h old mode 100644 new mode 100755 diff --git a/db/filename_test.cc b/db/filename_test.cc old mode 100644 new mode 100755 diff --git a/db/log_format.h b/db/log_format.h old mode 100644 new mode 100755 diff --git a/db/log_reader.cc b/db/log_reader.cc old mode 100644 new mode 100755 diff --git a/db/log_reader.h b/db/log_reader.h old mode 100644 new mode 100755 diff --git a/db/log_test.cc b/db/log_test.cc old mode 100644 new mode 100755 diff --git a/db/log_writer.cc b/db/log_writer.cc old mode 100644 new mode 100755 diff --git a/db/log_writer.h b/db/log_writer.h old mode 100644 new mode 100755 diff --git a/db/memtable.cc b/db/memtable.cc old mode 100644 new mode 100755 diff --git a/db/memtable.h b/db/memtable.h old mode 100644 new mode 100755 diff --git a/db/repair.cc b/db/repair.cc old mode 100644 new mode 100755 diff --git a/db/skiplist.h b/db/skiplist.h old mode 100644 new mode 100755 diff --git a/db/skiplist_test.cc b/db/skiplist_test.cc old mode 100644 new mode 100755 diff --git a/db/snapshot.h b/db/snapshot.h old mode 100644 new mode 100755 diff --git a/db/table_cache.cc b/db/table_cache.cc old mode 100644 new mode 100755 diff --git a/db/table_cache.h b/db/table_cache.h old mode 100644 new mode 100755 diff --git a/db/version_edit.cc b/db/version_edit.cc old mode 100644 new mode 100755 index 809dd825b..689dbe037 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -20,15 +20,18 @@ enum Tag { kDeletedFile = 6, kNewFile = 7, kLargeValueRef = 8, + kPrevLogNumber = 9, }; void VersionEdit::Clear() { comparator_.clear(); log_number_ = 0; + prev_log_number_ = 0; last_sequence_ = 0; next_file_number_ = 0; has_comparator_ = false; has_log_number_ = false; + has_prev_log_number_ = false; has_next_file_number_ = false; has_last_sequence_ = false; deleted_files_.clear(); @@ -45,6 +48,10 @@ void VersionEdit::EncodeTo(std::string* dst) const { PutVarint32(dst, kLogNumber); PutVarint64(dst, log_number_); } + if (has_prev_log_number_) { + PutVarint32(dst, kPrevLogNumber); + PutVarint64(dst, prev_log_number_); + } if (has_next_file_number_) { PutVarint32(dst, kNextFileNumber); PutVarint64(dst, next_file_number_); @@ -142,6 +149,14 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + case kPrevLogNumber: + if (GetVarint64(&input, &prev_log_number_)) { + has_prev_log_number_ = true; + } else { + msg = "previous log number"; + } + break; + case kNextFileNumber: if (GetVarint64(&input, &next_file_number_)) { has_next_file_number_ = true; @@ -228,6 +243,10 @@ std::string VersionEdit::DebugString() const { r.append("\n LogNumber: "); AppendNumberTo(&r, log_number_); } + if (has_prev_log_number_) { + r.append("\n PrevLogNumber: "); + AppendNumberTo(&r, prev_log_number_); + } if (has_next_file_number_) { r.append("\n NextFile: "); AppendNumberTo(&r, next_file_number_); diff --git a/db/version_edit.h b/db/version_edit.h old mode 100644 new mode 100755 index 1b7128339..7e417b567 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -39,6 +39,10 @@ class VersionEdit { has_log_number_ = true; log_number_ = num; } + void SetPrevLogNumber(uint64_t num) { + has_prev_log_number_ = true; + prev_log_number_ = num; + } void SetNextFile(uint64_t num) { has_next_file_number_ = true; next_file_number_ = num; @@ -95,10 +99,12 @@ class VersionEdit { std::string comparator_; uint64_t log_number_; + uint64_t prev_log_number_; uint64_t next_file_number_; SequenceNumber last_sequence_; bool has_comparator_; bool has_log_number_; + bool has_prev_log_number_; bool has_next_file_number_; bool has_last_sequence_; diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc old mode 100644 new mode 100755 diff --git a/db/version_set.cc b/db/version_set.cc old mode 100644 new mode 100755 index dc9b4182e..31f79bb60 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -27,16 +27,14 @@ static const int kTargetFileSize = 2 * 1048576; static const int64_t kMaxGrandParentOverlapBytes = 10 * kTargetFileSize; static double MaxBytesForLevel(int level) { - if (level == 0) { - return 4 * 1048576.0; - } else { - double result = 10 * 1048576.0; - while (level > 1) { - result *= 10; - level--; - } - return result; + // Note: the result for level zero is not really used since we set + // the level-0 compaction threshold based on number of files. + double result = 10 * 1048576.0; // Result for both level-0 and level-1 + while (level > 1) { + result *= 10; + level--; } + return result; } static uint64_t MaxFileSizeForLevel(int level) { @@ -327,6 +325,9 @@ VersionSet::VersionSet(const std::string& dbname, icmp_(*cmp), next_file_number_(2), manifest_file_number_(0), // Filled by Recover() + last_sequence_(0), + log_number_(0), + prev_log_number_(0), descriptor_file_(NULL), descriptor_log_(NULL), current_(new Version(this)), @@ -345,7 +346,19 @@ VersionSet::~VersionSet() { } Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) { + if (edit->has_log_number_) { + assert(edit->log_number_ >= log_number_); + assert(edit->log_number_ < next_file_number_); + } else { + edit->SetLogNumber(log_number_); + } + + if (!edit->has_prev_log_number_) { + edit->SetPrevLogNumber(prev_log_number_); + } + edit->SetNextFile(next_file_number_); + edit->SetLastSequence(last_sequence_); Version* v = new Version(this); { @@ -372,7 +385,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) { } } - // Write new record to log file + // Write new record to MANIFEST log if (s.ok()) { std::string record; edit->EncodeTo(&record); @@ -396,6 +409,8 @@ Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) { v->next_ = NULL; current_->next_ = v; current_ = v; + log_number_ = edit->log_number_; + prev_log_number_ = edit->prev_log_number_; } else { delete v; if (!new_manifest_file.empty()) { @@ -406,13 +421,11 @@ Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) { env_->DeleteFile(new_manifest_file); } } - //Log(env_, options_->info_log, "State\n%s", current_->DebugString().c_str()); return s; } -Status VersionSet::Recover(uint64_t* log_number, - SequenceNumber* last_sequence) { +Status VersionSet::Recover() { struct LogReporter : public log::Reader::Reporter { Status* status; virtual void Corruption(size_t bytes, const Status& s) { @@ -439,9 +452,13 @@ Status VersionSet::Recover(uint64_t* log_number, } bool have_log_number = false; + bool have_prev_log_number = false; bool have_next_file = false; bool have_last_sequence = false; uint64_t next_file = 0; + uint64_t last_sequence = 0; + uint64_t log_number = 0; + uint64_t prev_log_number = 0; Builder builder(this, current_); { @@ -467,17 +484,22 @@ Status VersionSet::Recover(uint64_t* log_number, } if (edit.has_log_number_) { - *log_number = edit.log_number_; + log_number = edit.log_number_; have_log_number = true; } + if (edit.has_prev_log_number_) { + prev_log_number = edit.prev_log_number_; + have_prev_log_number = true; + } + if (edit.has_next_file_number_) { next_file = edit.next_file_number_; have_next_file = true; } if (edit.has_last_sequence_) { - *last_sequence = edit.last_sequence_; + last_sequence = edit.last_sequence_; have_last_sequence = true; } } @@ -493,6 +515,10 @@ Status VersionSet::Recover(uint64_t* log_number, } else if (!have_last_sequence) { s = Status::Corruption("no last-sequence-number entry in descriptor"); } + + if (!have_prev_log_number) { + prev_log_number = 0; + } } if (s.ok()) { @@ -508,12 +534,23 @@ Status VersionSet::Recover(uint64_t* log_number, current_ = v; manifest_file_number_ = next_file; next_file_number_ = next_file + 1; + last_sequence_ = last_sequence; + log_number_ = log_number; + prev_log_number_ = prev_log_number; } } return s; } +static int64_t TotalFileSize(const std::vector& files) { + int64_t sum = 0; + for (int i = 0; i < files.size(); i++) { + sum += files[i]->file_size; + } + return sum; +} + Status VersionSet::Finalize(Version* v) { // Precomputed best level for next compaction int best_level = -1; @@ -523,23 +560,24 @@ Status VersionSet::Finalize(Version* v) { for (int level = 0; s.ok() && level < config::kNumLevels-1; level++) { s = SortLevel(v, level); - // Compute the ratio of current size to size limit. - uint64_t level_bytes = 0; - for (int i = 0; i < v->files_[level].size(); i++) { - level_bytes += v->files_[level][i]->file_size; - } - double score = static_cast(level_bytes) / MaxBytesForLevel(level); - + double score; if (level == 0) { - // Level-0 file sizes are going to be often much smaller than - // MaxBytesForLevel(0) since we do not account for compression - // when producing a level-0 file; and too many level-0 files - // increase merging costs. So use a file-count limit for - // level-0 in addition to the byte-count limit. - double count_score = v->files_[level].size() / 4.0; - if (count_score > score) { - score = count_score; - } + // We treat level-0 specially by bounding the number of files + // instead of number of bytes for two reasons: + // + // (1) With larger write-buffer sizes, it is nice not to do too + // many level-0 compactions. + // + // (2) The files in level-0 are merged on every read and + // therefore we wish to avoid too many files when the individual + // file size is small (perhaps because of a small write-buffer + // setting, or very high compression ratios, or lots of + // overwrites/deletions). + score = v->files_[level].size() / 4.0; + } else { + // Compute the ratio of current size to size limit. + const uint64_t level_bytes = TotalFileSize(v->files_[level]); + score = static_cast(level_bytes) / MaxBytesForLevel(level); } if (score > best_score) { @@ -696,8 +734,7 @@ bool VersionSet::RegisterLargeValueRef(const LargeValueRef& large_ref, return is_first; } -void VersionSet::CleanupLargeValueRefs(const std::set& live_tables, - uint64_t log_file_num) { +void VersionSet::CleanupLargeValueRefs(const std::set& live_tables) { for (LargeValueMap::iterator it = large_value_refs_.begin(); it != large_value_refs_.end(); ) { @@ -705,7 +742,8 @@ void VersionSet::CleanupLargeValueRefs(const std::set& live_tables, for (LargeReferencesSet::iterator ref_it = refs->begin(); ref_it != refs->end(); ) { - if (ref_it->first != log_file_num && // Not in log file + if (ref_it->first != log_number_ && // Not in log file + ref_it->first != prev_log_number_ && // Not in prev log live_tables.count(ref_it->first) == 0) { // Not in a live table // No longer live: erase LargeReferencesSet::iterator to_erase = ref_it; @@ -762,12 +800,10 @@ void VersionSet::AddLiveFiles(std::set* live) { } } -static int64_t TotalFileSize(const std::vector& files) { - int64_t sum = 0; - for (int i = 0; i < files.size(); i++) { - sum += files[i]->file_size; - } - return sum; +int64_t VersionSet::NumLevelBytes(int level) const { + assert(level >= 0); + assert(level < config::kNumLevels); + return TotalFileSize(current_->files_[level]); } int64_t VersionSet::MaxNextLevelOverlappingBytes() { diff --git a/db/version_set.h b/db/version_set.h old mode 100644 new mode 100755 index a4199be61..e1c5a4b91 --- a/db/version_set.h +++ b/db/version_set.h @@ -24,12 +24,6 @@ namespace leveldb { -// Grouping of constants. We may want to make some of these -// parameters set via options. -namespace config { -static const int kNumLevels = 7; -} - namespace log { class Writer; } class Compaction; @@ -107,7 +101,7 @@ class VersionSet { Status LogAndApply(VersionEdit* edit, MemTable* cleanup_mem); // Recover the last saved descriptor from persistent storage. - Status Recover(uint64_t* log_number, SequenceNumber* last_sequence); + Status Recover(); // Save current contents to *log Status WriteSnapshot(log::Writer* log); @@ -124,6 +118,25 @@ class VersionSet { // Return the number of Table files at the specified level. int NumLevelFiles(int level) const; + // Return the combined file size of all files at the specified level. + int64_t NumLevelBytes(int level) const; + + // Return the last sequence number. + uint64_t LastSequence() const { return last_sequence_; } + + // Set the last sequence number to s. + void SetLastSequence(uint64_t s) { + assert(s >= last_sequence_); + last_sequence_ = s; + } + + // Return the current log file number. + uint64_t LogNumber() const { return log_number_; } + + // Return the log file number for the log file that is currently + // being compacted, or zero if there is no such log file. + uint64_t PrevLogNumber() const { return prev_log_number_; } + // Pick level and inputs for a new compaction. // Returns NULL if there is no compaction to be done. // Otherwise returns a pointer to a heap-allocated object that @@ -168,9 +181,8 @@ class VersionSet { // Cleanup the large value reference state by eliminating any // references from files that are not includes in either "live_tables" - // or "log_file". - void CleanupLargeValueRefs(const std::set& live_tables, - uint64_t log_file_num); + // or the current log. + void CleanupLargeValueRefs(const std::set& live_tables); // Returns true if a large value with the given reference is live. bool LargeValueIsLive(const LargeValueRef& large_ref); @@ -213,6 +225,9 @@ class VersionSet { const InternalKeyComparator icmp_; uint64_t next_file_number_; uint64_t manifest_file_number_; + uint64_t last_sequence_; + uint64_t log_number_; + uint64_t prev_log_number_; // 0 or backing store for memtable being compacted // Opened lazily WritableFile* descriptor_file_; diff --git a/db/write_batch.cc b/db/write_batch.cc old mode 100644 new mode 100755 diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h old mode 100644 new mode 100755 diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc old mode 100644 new mode 100755 diff --git a/doc/doc.css b/doc/doc.css old mode 100644 new mode 100755 diff --git a/doc/impl.html b/doc/impl.html old mode 100644 new mode 100755 diff --git a/doc/index.html b/doc/index.html old mode 100644 new mode 100755 index e0baf2e7e..2a83fc37a --- a/doc/index.html +++ b/doc/index.html @@ -63,15 +63,12 @@ Example: The database provides Put, Delete, and Get methods to modify/query the database. For example, the following code moves the value stored under key1 to key2. -

   std::string value;
   leveldb::Status s = db->Get(leveldb::ReadOptions(), key1, &value);
   if (s.ok()) s = db->Put(leveldb::WriteOptions(), key2, value);
   if (s.ok()) s = db->Delete(leveldb::WriteOptions(), key1);
 
-See important performance note below for how to -speed up writes significantly.

Atomic Updates

@@ -100,6 +97,47 @@ we do not end up erroneously dropping the value entirely. Apart from its atomicity benefits, WriteBatch may also be used to speed up bulk updates by placing lots of individual mutations into the same batch. + +

Synchronous Writes

+By default, each write to leveldb is asynchronous: it +returns after pushing the write from the process into the operating +system. The transfer from operating system memory to the underlying +persistent storage happens asynchronously. The sync flag +can be turned on for a particular write to make the write operation +not return until the data being written has been pushed all the way to +persistent storage. (On Posix systems, this is implemented by calling +either fsync(...) or fdatasync(...) or +msync(..., MS_SYNC) before the write operation returns.) +
+  leveldb::WriteOptions write_options;
+  write_options.sync = true;
+  db->Put(write_options, ...);
+
+Asynchronous writes are often more than a thousand times as fast as +synchronous writes. The downside of asynchronous writes is that a +crash of the machine may cause the last few updates to be lost. Note +that a crash of just the writing process (i.e., not a reboot) will not +cause any loss since even when sync is false, an update +is pushed from the process memory into the operating system before it +is considered done. + +

+Asynchronous writes can often be used safely. For example, when +loading a large amount of data into the database you can handle lost +updates by restarting the bulk load after a crash. A hybrid scheme is +also possible where every Nth write is synchronous, and in the event +of a crash, the bulk load is restarted just after the last synchronous +write finished by the previous run. (The synchronous write can update +a marker that describes where to restart on a crash.) + +

+WriteBatch provides an alternative to asynchronous writes. +Multiple updates may be placed in the same WriteBatch and +applied together using a synchronous write (i.e., +write_options.sync is set to true). The extra cost of +the synchronous write will be amortized across all of the writes in +the batch. +

Concurrency

@@ -289,48 +327,12 @@ version numbers found in the keys to decide how to interpret them. Performance can be tuned by changing the default values of the types defined in leveldb/include/options.h. -

-

Asynchronous Writes

- -By default, each write to leveldb is synchronous: it does -not return until the write has been pushed from memory to persistent -storage. (On Posix systems, this is implemented by calling either -fdatasync(...) or msync(..., MS_SYNC).) -Synchronous writes may be very slow and the synchrony can be -optionally disabled: -
-  leveldb::WriteOptions write_options;
-  write_options.sync = false;
-  db->Put(write_options, ...);
-
-Asynchronous writes are often more than a hundred times as fast as -synchronous writes. The downside of asynchronous writes is that a -crash of the machine may cause the last few updates to be lost. Note -that a crash of just the writing process (i.e., not a reboot) will not -cause any loss since even when sync is false, an update -is pushed from the process memory into the operating system before it -is considered done. - -

-Asynchronous writes can be particularly beneficial when loading a -large amount of data into the database since you can mitigate the -problem of lost updates by restarting the bulk load. A hybrid scheme -is also possible where every Nth write is synchronous, and in the -event of a crash, the bulk load is restarted just after the last -synchronous write finished by the previous run. - -

-WriteBatch provides an alternative to asynchronous writes. -Multiple updates may be placed in the same WriteBatch and -applied together using a synchronous write. The extra cost of the -synchronous write will be amortized across all of the writes in the batch. -

Block size

leveldb groups adjacent keys together into the same block and such a block is the unit of transfer to and from persistent storage. The -default block size is approximately 8192 uncompressed bytes. +default block size is approximately 4096 uncompressed bytes. Applications that mostly do bulk scans over the contents of the database may wish to increase this size. Applications that do a lot of point reads of small values may wish to switch to a smaller block diff --git a/doc/log_format.txt b/doc/log_format.txt old mode 100644 new mode 100755 diff --git a/doc/table_format.txt b/doc/table_format.txt old mode 100644 new mode 100755 diff --git a/include/leveldb/cache.h b/include/leveldb/cache.h old mode 100644 new mode 100755 diff --git a/include/leveldb/comparator.h b/include/leveldb/comparator.h old mode 100644 new mode 100755 diff --git a/include/leveldb/db.h b/include/leveldb/db.h old mode 100644 new mode 100755 index 74d50d38a..f18ded347 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -13,7 +13,7 @@ namespace leveldb { static const int kMajorVersion = 1; -static const int kMinorVersion = 0; +static const int kMinorVersion = 1; struct Options; struct ReadOptions; @@ -49,7 +49,7 @@ class DB { // Set the database entry for "key" to "value". Returns OK on success, // and a non-OK status on error. - // Note: consider setting options.sync = false. + // Note: consider setting options.sync = true. virtual Status Put(const WriteOptions& options, const Slice& key, const Slice& value) = 0; @@ -57,12 +57,12 @@ class DB { // Remove the database entry (if any) for "key". Returns OK on // success, and a non-OK status on error. It is not an error if "key" // did not exist in the database. - // Note: consider setting options.sync = false. + // Note: consider setting options.sync = true. virtual Status Delete(const WriteOptions& options, const Slice& key) = 0; // Apply the specified updates to the database. // Returns OK on success, non-OK on failure. - // Note: consider setting options.sync = false. + // Note: consider setting options.sync = true. virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0; // If the database contains an entry for "key" store the @@ -103,7 +103,9 @@ class DB { // // "leveldb.num-files-at-level" - return the number of files at level , // where is an ASCII representation of a level number (e.g. "0"). - virtual bool GetProperty(const Slice& property, uint64_t* value) = 0; + // "leveldb.stats" - returns a multi-line string that describes statistics + // about the internal operation of the DB. + virtual bool GetProperty(const Slice& property, std::string* value) = 0; // For each i in [0,n-1], store in "sizes[i]", the approximate // file system space used by keys in "[range[i].start .. range[i].limit)". diff --git a/include/leveldb/env.h b/include/leveldb/env.h old mode 100644 new mode 100755 diff --git a/include/leveldb/iterator.h b/include/leveldb/iterator.h old mode 100644 new mode 100755 diff --git a/include/leveldb/options.h b/include/leveldb/options.h old mode 100644 new mode 100755 index 0b656246c..87d388e73 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -69,15 +69,14 @@ struct Options { // ------------------- // Parameters that affect performance - // Amount of data to build up in memory before converting to an - // on-disk file. + // Amount of data to build up in memory (backed by an unsorted log + // on disk) before converting to a sorted on-disk file. // - // Some DB operations may encounter a delay proportional to the size - // of this parameter. Therefore we recommend against increasing - // this parameter unless you are willing to live with an occasional - // slow operation in exchange for faster bulk loading throughput. + // Larger values increase performance, especially during bulk loads. + // Up to two write buffers may be held in memory at the same time, + // so you may wish to adjust this parameter to control memory usage. // - // Default: 1MB + // Default: 4MB size_t write_buffer_size; // Number of open files that can be used by the DB. You may need to @@ -100,7 +99,8 @@ struct Options { // Control over blocks (user data is stored in a set of blocks, and // a block is the unit of reading from disk). - // Use the specified cache for blocks (if non-NULL). + // If non-NULL, use the specified cache for blocks. + // If NULL, leveldb will automatically create and use an 8MB internal cache. // Default: NULL Cache* block_cache; @@ -109,7 +109,7 @@ struct Options { // actual size of the unit read from disk may be smaller if // compression is enabled. This parameter can be changed dynamically. // - // Default: 8K + // Default: 4K int block_size; // Number of keys between restart points for delta encoding of keys. @@ -177,7 +177,12 @@ struct WriteOptions { // crashes (i.e., the machine does not reboot), no writes will be // lost even if sync==false. // - // Default: true + // In other words, a DB write with sync==false has similar + // crash semantics as the "write()" system call. A DB write + // with sync==true has similar crash semantics to a "write()" + // system call followed by "fsync()". + // + // Default: false bool sync; // If "post_write_snapshot" is non-NULL, and the write succeeds, @@ -193,7 +198,7 @@ struct WriteOptions { const Snapshot** post_write_snapshot; WriteOptions() - : sync(true), + : sync(false), post_write_snapshot(NULL) { } }; diff --git a/include/leveldb/slice.h b/include/leveldb/slice.h old mode 100644 new mode 100755 diff --git a/include/leveldb/status.h b/include/leveldb/status.h old mode 100644 new mode 100755 diff --git a/include/leveldb/table.h b/include/leveldb/table.h old mode 100644 new mode 100755 diff --git a/include/leveldb/table_builder.h b/include/leveldb/table_builder.h old mode 100644 new mode 100755 diff --git a/include/leveldb/write_batch.h b/include/leveldb/write_batch.h old mode 100644 new mode 100755 diff --git a/leveldb.gyp b/leveldb.gyp old mode 100644 new mode 100755 diff --git a/port/README b/port/README old mode 100644 new mode 100755 diff --git a/port/port.h b/port/port.h old mode 100644 new mode 100755 diff --git a/port/port_android.cc b/port/port_android.cc old mode 100644 new mode 100755 index 8a74111bf..240e9cacd --- a/port/port_android.cc +++ b/port/port_android.cc @@ -24,7 +24,6 @@ int fdatasync(int fd) { } } -// TODO(gabor): This is copied from port_posix.cc - not sure if I should do this? namespace leveldb { namespace port { diff --git a/port/port_android.h b/port/port_android.h old mode 100644 new mode 100755 index ca0362d1b..8680951b0 --- a/port/port_android.h +++ b/port/port_android.h @@ -15,6 +15,20 @@ #include #include +// Collapse the plethora of ARM flavors available to an easier to manage set +// Defs reference is at https://wiki.edubuntu.org/ARM/Thumb2PortingHowto +#if defined(__ARM_ARCH_6__) || \ + defined(__ARM_ARCH_6J__) || \ + defined(__ARM_ARCH_6K__) || \ + defined(__ARM_ARCH_6Z__) || \ + defined(__ARM_ARCH_6T2__) || \ + defined(__ARM_ARCH_6ZK__) || \ + defined(__ARM_ARCH_7__) || \ + defined(__ARM_ARCH_7R__) || \ + defined(__ARM_ARCH_7A__) +#define ARMV6_OR_7 1 +#endif + extern "C" { size_t fread_unlocked(void *a, size_t b, size_t c, FILE *d); size_t fwrite_unlocked(const void *a, size_t b, size_t c, FILE *d); @@ -61,28 +75,50 @@ class CondVar { pthread_cond_t cv_; }; +#ifndef ARMV6_OR_7 +// On ARM chipsets rep_; + void* rep_; + + inline void MemoryBarrier() const { + // TODO(gabor): This only works on Android instruction sets >= V6 +#ifdef ARMV6_OR_7 + __asm__ __volatile__("dmb" : : : "memory"); +#else + pLinuxKernelMemoryBarrier(); +#endif + } + public: AtomicPointer() { } explicit AtomicPointer(void* v) : rep_(v) { } inline void* Acquire_Load() const { - return rep_.load(std::memory_order_acquire); + void* r = rep_; + MemoryBarrier(); + return r; } inline void Release_Store(void* v) { - rep_.store(v, std::memory_order_release); + MemoryBarrier(); + rep_ = v; } inline void* NoBarrier_Load() const { - return rep_.load(std::memory_order_relaxed); + void* r = rep_; + return r; } inline void NoBarrier_Store(void* v) { - rep_.store(v, std::memory_order_relaxed); + rep_ = v; } }; -// TODO(gabor): Implement actual compress +// TODO(gabor): Implement compress inline bool Snappy_Compress( const char* input, size_t input_length, @@ -90,7 +126,7 @@ inline bool Snappy_Compress( return false; } -// TODO(gabor): Implement actual uncompress +// TODO(gabor): Implement uncompress inline bool Snappy_Uncompress( const char* input_data, size_t input_length, diff --git a/port/port_chromium.cc b/port/port_chromium.cc old mode 100644 new mode 100755 diff --git a/port/port_chromium.h b/port/port_chromium.h old mode 100644 new mode 100755 diff --git a/port/port_example.h b/port/port_example.h old mode 100644 new mode 100755 diff --git a/port/port_posix.cc b/port/port_posix.cc old mode 100644 new mode 100755 diff --git a/port/port_posix.h b/port/port_posix.h old mode 100644 new mode 100755 diff --git a/port/sha1_portable.cc b/port/sha1_portable.cc old mode 100644 new mode 100755 diff --git a/port/sha1_portable.h b/port/sha1_portable.h old mode 100644 new mode 100755 diff --git a/port/sha1_test.cc b/port/sha1_test.cc old mode 100644 new mode 100755 diff --git a/port/win/stdint.h b/port/win/stdint.h old mode 100644 new mode 100755 diff --git a/table/block.cc b/table/block.cc old mode 100644 new mode 100755 diff --git a/table/block.h b/table/block.h old mode 100644 new mode 100755 diff --git a/table/block_builder.cc b/table/block_builder.cc old mode 100644 new mode 100755 diff --git a/table/block_builder.h b/table/block_builder.h old mode 100644 new mode 100755 diff --git a/table/format.cc b/table/format.cc old mode 100644 new mode 100755 diff --git a/table/format.h b/table/format.h old mode 100644 new mode 100755 diff --git a/table/iterator.cc b/table/iterator.cc old mode 100644 new mode 100755 diff --git a/table/iterator_wrapper.h b/table/iterator_wrapper.h old mode 100644 new mode 100755 diff --git a/table/merger.cc b/table/merger.cc old mode 100644 new mode 100755 diff --git a/table/merger.h b/table/merger.h old mode 100644 new mode 100755 diff --git a/table/table.cc b/table/table.cc old mode 100644 new mode 100755 diff --git a/table/table_builder.cc b/table/table_builder.cc old mode 100644 new mode 100755 diff --git a/table/table_test.cc b/table/table_test.cc old mode 100644 new mode 100755 index e0c71343e..4b3e85e22 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -725,10 +725,10 @@ TEST(Harness, RandomizedLongDB) { Test(&rnd); // We must have created enough data to force merging - uint64_t l0_files, l1_files; + std::string l0_files, l1_files; ASSERT_TRUE(db()->GetProperty("leveldb.num-files-at-level0", &l0_files)); ASSERT_TRUE(db()->GetProperty("leveldb.num-files-at-level1", &l1_files)); - ASSERT_GT(l0_files + l1_files, 0); + ASSERT_GT(atoi(l0_files.c_str()) + atoi(l1_files.c_str()), 0); } diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc old mode 100644 new mode 100755 diff --git a/table/two_level_iterator.h b/table/two_level_iterator.h old mode 100644 new mode 100755 diff --git a/util/arena.cc b/util/arena.cc old mode 100644 new mode 100755 diff --git a/util/arena.h b/util/arena.h old mode 100644 new mode 100755 diff --git a/util/arena_test.cc b/util/arena_test.cc old mode 100644 new mode 100755 diff --git a/util/cache.cc b/util/cache.cc old mode 100644 new mode 100755 diff --git a/util/cache_test.cc b/util/cache_test.cc old mode 100644 new mode 100755 diff --git a/util/coding.cc b/util/coding.cc old mode 100644 new mode 100755 diff --git a/util/coding.h b/util/coding.h old mode 100644 new mode 100755 diff --git a/util/coding_test.cc b/util/coding_test.cc old mode 100644 new mode 100755 diff --git a/util/comparator.cc b/util/comparator.cc old mode 100644 new mode 100755 diff --git a/util/crc32c.cc b/util/crc32c.cc old mode 100644 new mode 100755 diff --git a/util/crc32c.h b/util/crc32c.h old mode 100644 new mode 100755 diff --git a/util/crc32c_test.cc b/util/crc32c_test.cc old mode 100644 new mode 100755 diff --git a/util/env.cc b/util/env.cc old mode 100644 new mode 100755 diff --git a/util/env_chromium.cc b/util/env_chromium.cc old mode 100644 new mode 100755 diff --git a/util/env_posix.cc b/util/env_posix.cc old mode 100644 new mode 100755 diff --git a/util/env_test.cc b/util/env_test.cc old mode 100644 new mode 100755 diff --git a/util/hash.cc b/util/hash.cc old mode 100644 new mode 100755 diff --git a/util/hash.h b/util/hash.h old mode 100644 new mode 100755 diff --git a/util/histogram.cc b/util/histogram.cc old mode 100644 new mode 100755 diff --git a/util/histogram.h b/util/histogram.h old mode 100644 new mode 100755 diff --git a/util/logging.cc b/util/logging.cc old mode 100644 new mode 100755 diff --git a/util/logging.h b/util/logging.h old mode 100644 new mode 100755 diff --git a/util/mutexlock.h b/util/mutexlock.h old mode 100644 new mode 100755 diff --git a/util/options.cc b/util/options.cc old mode 100644 new mode 100755 index 421608bb0..29272fe1a --- a/util/options.cc +++ b/util/options.cc @@ -16,11 +16,11 @@ Options::Options() paranoid_checks(false), env(Env::Default()), info_log(NULL), - write_buffer_size(1<<20), + write_buffer_size(4<<20), max_open_files(1000), large_value_threshold(65536), block_cache(NULL), - block_size(8192), + block_size(4096), block_restart_interval(16), compression(kSnappyCompression) { } diff --git a/util/random.h b/util/random.h old mode 100644 new mode 100755 diff --git a/util/status.cc b/util/status.cc old mode 100644 new mode 100755 diff --git a/util/testharness.cc b/util/testharness.cc old mode 100644 new mode 100755 diff --git a/util/testharness.h b/util/testharness.h old mode 100644 new mode 100755 diff --git a/util/testutil.cc b/util/testutil.cc old mode 100644 new mode 100755 diff --git a/util/testutil.h b/util/testutil.h old mode 100644 new mode 100755