diff --git a/db/db_bench.cc b/db/db_bench.cc index d3ec61bd9..bb63e59d1 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -280,6 +280,7 @@ struct ThreadState { int tid; // 0..n-1 when running in n threads Random rand; // Has different seeds for different threads Stats stats; + SharedState* shared; ThreadState(int index) : tid(index), @@ -418,13 +419,14 @@ class Benchmark { // Reset parameters that may be overriddden bwlow num_ = FLAGS_num; - reads_ = num_; + reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads); value_size_ = FLAGS_value_size; entries_per_batch_ = 1; write_options_ = WriteOptions(); void (Benchmark::*method)(ThreadState*) = NULL; bool fresh_db = false; + int num_threads = FLAGS_threads; if (name == Slice("fillseq")) { fresh_db = true; @@ -460,6 +462,9 @@ class Benchmark { } else if (name == Slice("readrandomsmall")) { reads_ /= 1000; method = &Benchmark::ReadRandom; + } else if (name == Slice("readwhilewriting")) { + num_threads++; // Add extra thread for writing + method = &Benchmark::ReadWhileWriting; } else if (name == Slice("compact")) { method = &Benchmark::Compact; } else if (name == Slice("crc32c")) { @@ -494,7 +499,7 @@ class Benchmark { } if (method != NULL) { - RunBenchmark(name, method); + RunBenchmark(num_threads, name, method); } } } @@ -535,8 +540,8 @@ class Benchmark { } } - void RunBenchmark(Slice name, void (Benchmark::*method)(ThreadState*)) { - const int n = FLAGS_threads; + void RunBenchmark(int n, Slice name, + void (Benchmark::*method)(ThreadState*)) { SharedState shared; shared.total = n; shared.num_initialized = 0; @@ -549,6 +554,7 @@ class Benchmark { arg[i].method = method; arg[i].shared = &shared; arg[i].thread = new ThreadState(i); + arg[i].thread->shared = &shared; Env::Default()->StartThread(ThreadBody, &arg[i]); } @@ -688,7 +694,6 @@ class Benchmark { RandomGenerator gen; WriteBatch batch; Status s; - std::string val; int64_t bytes = 0; for (int i = 0; i < num_; i += entries_per_batch_) { batch.Clear(); @@ -760,6 +765,36 @@ class Benchmark { } } + void ReadWhileWriting(ThreadState* thread) { + if (thread->tid > 0) { + ReadRandom(thread); + } else { + // Special thread that keeps writing until other threads are done. + RandomGenerator gen; + while (true) { + { + MutexLock l(&thread->shared->mu); + if (thread->shared->num_done + 1 >= thread->shared->num_initialized) { + // Other threads have finished + break; + } + } + + const int k = thread->rand.Next() % FLAGS_num; + char key[100]; + snprintf(key, sizeof(key), "%016d", k); + Status s = db_->Put(write_options_, key, gen.Generate(value_size_)); + if (!s.ok()) { + fprintf(stderr, "put error: %s\n", s.ToString().c_str()); + exit(1); + } + } + + // Do not count any of the preceding work/delay in stats. + thread->stats.Start(); + } + } + void Compact(ThreadState* thread) { DBImpl* dbi = reinterpret_cast(db_); dbi->TEST_CompactMemTable(); diff --git a/db/db_impl.cc b/db/db_impl.cc index c4c6a614b..0ca638651 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -113,6 +113,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) logfile_(NULL), logfile_number_(0), log_(NULL), + logger_(NULL), + logger_cv_(&mutex_), bg_compaction_scheduled_(false), manual_compaction_(NULL) { mem_->Ref(); @@ -308,6 +310,11 @@ Status DBImpl::Recover(VersionEdit* edit) { std::sort(logs.begin(), logs.end()); for (size_t i = 0; i < logs.size(); i++) { s = RecoverLogFile(logs[i], edit, &max_sequence); + + // The previous incarnation may not have written any MANIFEST + // records after allocating this log number. So we manually + // update the file number allocation counter in VersionSet. + versions_->MarkFileNumberUsed(logs[i]); } if (s.ok()) { @@ -485,7 +492,7 @@ Status DBImpl::CompactMemTable() { if (s.ok()) { edit.SetPrevLogNumber(0); edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed - s = versions_->LogAndApply(&edit); + s = versions_->LogAndApply(&edit, &mutex_); } if (s.ok()) { @@ -523,7 +530,10 @@ void DBImpl::TEST_CompactRange( Status DBImpl::TEST_CompactMemTable() { MutexLock l(&mutex_); + LoggerId self; + AcquireLoggingResponsibility(&self); Status s = MakeRoomForWrite(true /* force compaction */); + ReleaseLoggingResponsibility(&self); if (s.ok()) { // Wait until the compaction completes while (imm_ != NULL && bg_error_.ok()) { @@ -600,7 +610,7 @@ void DBImpl::BackgroundCompaction() { c->edit()->DeleteFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest); - status = versions_->LogAndApply(c->edit()); + status = versions_->LogAndApply(c->edit(), &mutex_); VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast(f->number), @@ -748,7 +758,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { } compact->outputs.clear(); - Status s = versions_->LogAndApply(compact->compaction->edit()); + Status s = versions_->LogAndApply(compact->compaction->edit(), &mutex_); if (s.ok()) { compact->compaction->ReleaseInputs(); DeleteObsoleteFiles(); @@ -1004,9 +1014,9 @@ Status DBImpl::Get(const ReadOptions& options, mutex_.Unlock(); // First look in the memtable, then in the immutable memtable (if any). LookupKey lkey(key, snapshot); - if (mem_->Get(lkey, value, &s)) { + if (mem->Get(lkey, value, &s)) { // Done - } else if (imm_ != NULL && imm_->Get(lkey, value, &s)) { + } else if (imm != NULL && imm->Get(lkey, value, &s)) { // Done } else { s = current->Get(options, lkey, value, &stats); @@ -1053,34 +1063,65 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { return DB::Delete(options, key); } +// There is at most one thread that is the current logger. This call +// waits until preceding logger(s) have finished and becomes the +// current logger. +void DBImpl::AcquireLoggingResponsibility(LoggerId* self) { + while (logger_ != NULL) { + logger_cv_.Wait(); + } + logger_ = self; +} + +void DBImpl::ReleaseLoggingResponsibility(LoggerId* self) { + assert(logger_ == self); + logger_ = NULL; + logger_cv_.SignalAll(); +} + Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Status status; MutexLock l(&mutex_); + LoggerId self; + AcquireLoggingResponsibility(&self); status = MakeRoomForWrite(false); // May temporarily release lock and wait uint64_t last_sequence = versions_->LastSequence(); if (status.ok()) { WriteBatchInternal::SetSequence(updates, last_sequence + 1); last_sequence += WriteBatchInternal::Count(updates); - versions_->SetLastSequence(last_sequence); - // Add to log and apply to memtable - status = log_->AddRecord(WriteBatchInternal::Contents(updates)); - if (status.ok() && options.sync) { - status = logfile_->Sync(); - } - if (status.ok()) { - status = WriteBatchInternal::InsertInto(updates, mem_); + // Add to log and apply to memtable. We can release the lock during + // this phase since the "logger_" flag protects against concurrent + // loggers and concurrent writes into mem_. + { + assert(logger_ == &self); + mutex_.Unlock(); + status = log_->AddRecord(WriteBatchInternal::Contents(updates)); + if (status.ok() && options.sync) { + status = logfile_->Sync(); + } + if (status.ok()) { + status = WriteBatchInternal::InsertInto(updates, mem_); + } + mutex_.Lock(); + assert(logger_ == &self); } + + versions_->SetLastSequence(last_sequence); } if (options.post_write_snapshot != NULL) { *options.post_write_snapshot = status.ok() ? snapshots_.New(last_sequence) : NULL; } + ReleaseLoggingResponsibility(&self); return status; } +// REQUIRES: mutex_ is held +// REQUIRES: this thread is the current logger Status DBImpl::MakeRoomForWrite(bool force) { mutex_.AssertHeld(); + assert(logger_ != NULL); bool allow_delay = !force; Status s; while (true) { @@ -1249,7 +1290,7 @@ Status DB::Open(const Options& options, const std::string& dbname, impl->logfile_ = lfile; impl->logfile_number_ = new_log_number; impl->log_ = new log::Writer(lfile); - s = impl->versions_->LogAndApply(&edit); + s = impl->versions_->LogAndApply(&edit, &impl->mutex_); } if (s.ok()) { impl->DeleteObsoleteFiles(); diff --git a/db/db_impl.h b/db/db_impl.h index f11ea55dd..526813760 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -87,6 +87,11 @@ class DBImpl : public DB { Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base); + // Only thread is allowed to log at a time. + struct LoggerId { }; // Opaque identifier for logging thread + void AcquireLoggingResponsibility(LoggerId* self); + void ReleaseLoggingResponsibility(LoggerId* self); + Status MakeRoomForWrite(bool force /* compact even if there is room? */); struct CompactionState; @@ -126,6 +131,8 @@ class DBImpl : public DB { WritableFile* logfile_; uint64_t logfile_number_; log::Writer* log_; + LoggerId* logger_; // NULL, or the id of the current logging thread + port::CondVar logger_cv_; // For threads waiting to log SnapshotList snapshots_; // Set of table files to protect from deletion because they are diff --git a/db/db_test.cc b/db/db_test.cc index 14eb44d7b..daa9c0308 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -10,6 +10,7 @@ #include "leveldb/env.h" #include "leveldb/table.h" #include "util/logging.h" +#include "util/mutexlock.h" #include "util/testharness.h" #include "util/testutil.h" @@ -345,6 +346,41 @@ TEST(DBTest, GetPicksCorrectFile) { ASSERT_EQ("vx", Get("x")); } +TEST(DBTest, GetEncountersEmptyLevel) { + // Arrange for the following to happen: + // * sstable A in level 0 + // * nothing in level 1 + // * sstable B in level 2 + // Then do enough Get() calls to arrange for an automatic compaction + // of sstable A. A bug would cause the compaction to be marked as + // occuring at level 1 (instead of the correct level 0). + + // Step 1: First place sstables in levels 0 and 2 + int compaction_count = 0; + while (NumTableFilesAtLevel(0) == 0 || + NumTableFilesAtLevel(2) == 0) { + ASSERT_LE(compaction_count, 100) << "could not fill levels 0 and 2"; + compaction_count++; + Put("a", "begin"); + Put("z", "end"); + dbfull()->TEST_CompactMemTable(); + } + + // Step 2: clear level 1 if necessary. + dbfull()->TEST_CompactRange(1, "a", "z"); + ASSERT_EQ(NumTableFilesAtLevel(0), 1); + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + ASSERT_EQ(NumTableFilesAtLevel(2), 1); + + // Step 3: read until level 0 compaction disappears. + int read_count = 0; + while (NumTableFilesAtLevel(0) > 0) { + ASSERT_LE(read_count, 10000) << "did not trigger level 0 compaction"; + read_count++; + ASSERT_EQ("NOT_FOUND", Get("missing")); + } +} + TEST(DBTest, IterEmpty) { Iterator* iter = db_->NewIterator(ReadOptions()); @@ -1355,6 +1391,9 @@ void BM_LogAndApply(int iters, int num_base_files) { Env* env = Env::Default(); + port::Mutex mu; + MutexLock l(&mu); + InternalKeyComparator cmp(BytewiseComparator()); Options options; VersionSet vset(dbname, &options, NULL, &cmp); @@ -1366,7 +1405,7 @@ void BM_LogAndApply(int iters, int num_base_files) { InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); vbase.AddFile(2, fnum++, 1 /* file size */, start, limit); } - ASSERT_OK(vset.LogAndApply(&vbase)); + ASSERT_OK(vset.LogAndApply(&vbase, &mu)); uint64_t start_micros = env->NowMicros(); @@ -1376,7 +1415,7 @@ void BM_LogAndApply(int iters, int num_base_files) { InternalKey start(MakeKey(2*fnum), 1, kTypeValue); InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); vedit.AddFile(2, fnum++, 1 /* file size */, start, limit); - vset.LogAndApply(&vedit); + vset.LogAndApply(&vedit, &mu); } uint64_t stop_micros = env->NowMicros(); unsigned int us = stop_micros - start_micros; diff --git a/db/version_set.cc b/db/version_set.cc index aace6245d..d75b34771 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -250,6 +250,7 @@ Status Version::Get(const ReadOptions& options, stats->seek_file = NULL; stats->seek_file_level = -1; FileMetaData* last_file_read = NULL; + int last_file_read_level = -1; // We can search level-by-level since entries never hop across // levels. Therefore we are guaranteed that if we find data @@ -301,11 +302,12 @@ Status Version::Get(const ReadOptions& options, if (last_file_read != NULL && stats->seek_file == NULL) { // We have had more than one seek for this read. Charge the 1st file. stats->seek_file = last_file_read; - stats->seek_file_level = (i == 0 ? level - 1 : level); + stats->seek_file_level = last_file_read_level; } FileMetaData* f = files[i]; last_file_read = f; + last_file_read_level = level; Iterator* iter = vset_->table_cache_->NewIterator( options, @@ -609,7 +611,7 @@ void VersionSet::AppendVersion(Version* v) { v->next_->prev_ = v; } -Status VersionSet::LogAndApply(VersionEdit* edit) { +Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { if (edit->has_log_number_) { assert(edit->log_number_ >= log_number_); assert(edit->log_number_ < next_file_number_); @@ -637,6 +639,8 @@ Status VersionSet::LogAndApply(VersionEdit* edit) { std::string new_manifest_file; Status s; if (descriptor_log_ == NULL) { + // No reason to unlock *mu here since we only hit this path in the + // first call to LogAndApply (when opening the database). assert(descriptor_file_ == NULL); new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_); edit->SetNextFile(next_file_number_); @@ -647,20 +651,27 @@ Status VersionSet::LogAndApply(VersionEdit* edit) { } } - // Write new record to MANIFEST log - if (s.ok()) { - std::string record; - edit->EncodeTo(&record); - s = descriptor_log_->AddRecord(record); + // Unlock during expensive MANIFEST log write + { + mu->Unlock(); + + // Write new record to MANIFEST log if (s.ok()) { - s = descriptor_file_->Sync(); + std::string record; + edit->EncodeTo(&record); + s = descriptor_log_->AddRecord(record); + if (s.ok()) { + s = descriptor_file_->Sync(); + } } - } - // If we just created a new descriptor file, install it by writing a - // new CURRENT file that points to it. - if (s.ok() && !new_manifest_file.empty()) { - s = SetCurrentFile(env_, dbname_, manifest_file_number_); + // If we just created a new descriptor file, install it by writing a + // new CURRENT file that points to it. + if (s.ok() && !new_manifest_file.empty()) { + s = SetCurrentFile(env_, dbname_, manifest_file_number_); + } + + mu->Lock(); } // Install the new version @@ -776,6 +787,9 @@ Status VersionSet::Recover() { if (!have_prev_log_number) { prev_log_number = 0; } + + MarkFileNumberUsed(prev_log_number); + MarkFileNumberUsed(log_number); } if (s.ok()) { @@ -794,6 +808,12 @@ Status VersionSet::Recover() { return s; } +void VersionSet::MarkFileNumberUsed(uint64_t number) { + if (next_file_number_ <= number) { + next_file_number_ = number + 1; + } +} + static int64_t TotalFileSize(const std::vector& files) { int64_t sum = 0; for (size_t i = 0; i < files.size(); i++) { diff --git a/db/version_set.h b/db/version_set.h index 693fc6fa9..2dbd9480c 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -138,15 +138,14 @@ class VersionSet { // Apply *edit to the current version to form a new descriptor that // is both saved to persistent state and installed as the new - // current version. - Status LogAndApply(VersionEdit* edit); + // current version. Will release *mu while actually writing to the file. + // REQUIRES: *mu is held on entry. + // REQUIRES: no other thread concurrently calls LogAndApply() + Status LogAndApply(VersionEdit* edit, port::Mutex* mu); // Recover the last saved descriptor from persistent storage. Status Recover(); - // Save current contents to *log - Status WriteSnapshot(log::Writer* log); - // Return the current version. Version* current() const { return current_; } @@ -171,6 +170,9 @@ class VersionSet { last_sequence_ = s; } + // Mark the specified file number as used. + void MarkFileNumberUsed(uint64_t number); + // Return the current log file number. uint64_t LogNumber() const { return log_number_; } @@ -247,6 +249,9 @@ class VersionSet { void SetupOtherInputs(Compaction* c); + // Save current contents to *log + Status WriteSnapshot(log::Writer* log); + void AppendVersion(Version* v); Env* const env_;