diff --git a/Makefile b/Makefile index c7e5fcb3a..dad0c4fbe 100644 --- a/Makefile +++ b/Makefile @@ -384,6 +384,7 @@ PARALLEL_TEST = \ compact_on_deletion_collector_test \ db_compaction_filter_test \ db_compaction_test \ + db_sst_test \ db_test \ db_universal_compaction_test \ fault_injection_test \ diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 24730bc11..bbdc7cb12 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -18,6 +18,7 @@ #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "util/coding.h" +#include "util/fault_injection_test_env.h" #include "util/options_parser.h" #include "util/string_util.h" #include "util/sync_point.h" @@ -782,6 +783,38 @@ TEST_F(ColumnFamilyTest, LogDeletionTest) { Close(); } +TEST_F(ColumnFamilyTest, CrashAfterFlush) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(env_)); + db_options_.env = fault_env.get(); + Open(); + CreateColumnFamilies({"one"}); + + WriteBatch batch; + batch.Put(handles_[0], Slice("foo"), Slice("bar")); + batch.Put(handles_[1], Slice("foo"), Slice("bar")); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + Flush(0); + fault_env->SetFilesystemActive(false); + + std::vector names; + for (auto name : names_) { + if (name != "") { + names.push_back(name); + } + } + Close(); + fault_env->DropUnsyncedFileData(); + fault_env->ResetState(); + Open(names, {}); + + // Write batch should be atomic. + ASSERT_EQ(Get(0, "foo"), Get(1, "foo")); + + Close(); + db_options_.env = env_; +} + // Makes sure that obsolete log files get deleted TEST_F(ColumnFamilyTest, DifferentWriteBufferSizes) { // disable flushing stale column families diff --git a/db/db_impl.cc b/db/db_impl.cc index a445db805..1175b8d33 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1771,6 +1771,49 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, return s; } +Status DBImpl::SyncClosedLogs(JobContext* job_context) { + mutex_.AssertHeld(); + autovector logs_to_sync; + uint64_t current_log_number = logfile_number_; + while (logs_.front().number < current_log_number && + logs_.front().getting_synced) { + log_sync_cv_.Wait(); + } + for (auto it = logs_.begin(); + it != logs_.end() && it->number < current_log_number; ++it) { + auto& log = *it; + assert(!log.getting_synced); + log.getting_synced = true; + logs_to_sync.push_back(log.writer); + } + + Status s; + if (!logs_to_sync.empty()) { + mutex_.Unlock(); + + for (log::Writer* log : logs_to_sync) { + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "[JOB %d] Syncing log #%" PRIu64, job_context->job_id, + log->get_log_number()); + s = log->file()->Sync(db_options_.use_fsync); + } + if (s.ok()) { + s = directories_.GetWalDir()->Fsync(); + } + + mutex_.Lock(); + + // "number <= current_log_number - 1" is equivalent to + // "number < current_log_number". + MarkLogsSynced(current_log_number - 1, true, s); + if (!s.ok()) { + bg_error_ = s; + return s; + } + } + return s; +} + Status DBImpl::FlushMemTableToOutputFile( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) { @@ -1792,13 +1835,30 @@ Status DBImpl::FlushMemTableToOutputFile( FileMetaData file_meta; + flush_job.PickMemTable(); + + Status s; + if (logfile_number_ > 0 && + versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 0 && + !db_options_.disableDataSync) { + // If there are more than one column families, we need to make sure that + // all the log files except the most recent one are synced. Otherwise if + // the host crashes after flushing and before WAL is persistent, the + // flushed SST may contain data from write batches whose updates to + // other column families are missing. + // SyncClosedLogs() may unlock and re-lock the db_mutex. + s = SyncClosedLogs(job_context); + } + // Within flush_job.Run, rocksdb may call event listener to notify // file creation and deletion. // // Note that flush_job.Run will unlock and lock the db_mutex, // and EventListener callback will be called when the db_mutex // is unlocked by the current thread. - Status s = flush_job.Run(&file_meta); + if (s.ok()) { + s = flush_job.Run(&file_meta); + } if (s.ok()) { InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context, @@ -2549,12 +2609,11 @@ Status DBImpl::SyncWAL() { } TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1"); - TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2"); - { InstrumentedMutexLock l(&mutex_); MarkLogsSynced(current_log_number, need_log_dir_sync, status); } + TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2"); return status; } diff --git a/db/db_impl.h b/db/db_impl.h index add0d751d..ab4cbc0c0 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -601,6 +601,8 @@ class DBImpl : public DB { // and blocked by any other pending_outputs_ calls) void ReleaseFileNumberFromPendingOutputs(std::list::iterator v); + Status SyncClosedLogs(JobContext* job_context); + // Flush the in-memory write buffer to storage. Switches to a new // log-file/memtable and writes a new descriptor iff successful. Status FlushMemTableToOutputFile(ColumnFamilyData* cfd, diff --git a/db/flush_job.cc b/db/flush_job.cc index 8db50a41a..bb38f485d 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -84,7 +84,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, output_compression_(output_compression), stats_(stats), event_logger_(event_logger), - measure_io_stats_(measure_io_stats) { + measure_io_stats_(measure_io_stats), + pick_memtable_called(false) { // Update the thread status to indicate flush. ReportStartedFlush(); TEST_SYNC_POINT("FlushJob::FlushJob()"); @@ -121,9 +122,47 @@ void FlushJob::RecordFlushIOStats() { IOSTATS_RESET(bytes_written); } +void FlushJob::PickMemTable() { + db_mutex_->AssertHeld(); + assert(!pick_memtable_called); + pick_memtable_called = true; + // Save the contents of the earliest memtable as a new Table + cfd_->imm()->PickMemtablesToFlush(&mems_); + if (mems_.empty()) { + return; + } + + ReportFlushInputSize(mems_); + + // entries mems are (implicitly) sorted in ascending order by their created + // time. We will use the first memtable's `edit` to keep the meta info for + // this flush. + MemTable* m = mems_[0]; + edit_ = m->GetEdits(); + edit_->SetPrevLogNumber(0); + // SetLogNumber(log_num) indicates logs with number smaller than log_num + // will no longer be picked up for recovery. + edit_->SetLogNumber(mems_.back()->GetNextLogNumber()); + edit_->SetColumnFamily(cfd_->GetID()); + + // path 0 for level 0 file. + meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); + + base_ = cfd_->current(); + base_->Ref(); // it is likely that we do not need this reference +} + Status FlushJob::Run(FileMetaData* file_meta) { + db_mutex_->AssertHeld(); + assert(pick_memtable_called); AutoThreadOperationStageUpdater stage_run( ThreadStatus::STAGE_FLUSH_RUN); + if (mems_.empty()) { + LogToBuffer(log_buffer_, "[%s] Nothing in memtable to flush", + cfd_->GetName().c_str()); + return Status::OK(); + } + // I/O measurement variables PerfLevel prev_perf_level = PerfLevel::kEnableTime; uint64_t prev_write_nanos = 0; @@ -139,31 +178,8 @@ Status FlushJob::Run(FileMetaData* file_meta) { prev_prepare_write_nanos = IOSTATS(prepare_write_nanos); } - // Save the contents of the earliest memtable as a new Table - FileMetaData meta; - autovector mems; - cfd_->imm()->PickMemtablesToFlush(&mems); - if (mems.empty()) { - LogToBuffer(log_buffer_, "[%s] Nothing in memtable to flush", - cfd_->GetName().c_str()); - return Status::OK(); - } - - ReportFlushInputSize(mems); - - // entries mems are (implicitly) sorted in ascending order by their created - // time. We will use the first memtable's `edit` to keep the meta info for - // this flush. - MemTable* m = mems[0]; - VersionEdit* edit = m->GetEdits(); - edit->SetPrevLogNumber(0); - // SetLogNumber(log_num) indicates logs with number smaller than log_num - // will no longer be picked up for recovery. - edit->SetLogNumber(mems.back()->GetNextLogNumber()); - edit->SetColumnFamily(cfd_->GetID()); - // This will release and re-acquire the mutex. - Status s = WriteLevel0Table(mems, edit, &meta); + Status s = WriteLevel0Table(); if (s.ok() && (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) { @@ -172,18 +188,18 @@ Status FlushJob::Run(FileMetaData* file_meta) { } if (!s.ok()) { - cfd_->imm()->RollbackMemtableFlush(mems, meta.fd.GetNumber()); + cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber()); } else { TEST_SYNC_POINT("FlushJob::InstallResults"); // Replace immutable memtable with the generated Table s = cfd_->imm()->InstallMemtableFlushResults( - cfd_, mutable_cf_options_, mems, versions_, db_mutex_, - meta.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, + cfd_, mutable_cf_options_, mems_, versions_, db_mutex_, + meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, log_buffer_); } if (s.ok() && file_meta != nullptr) { - *file_meta = meta; + *file_meta = meta_; } RecordFlushIOStats(); @@ -214,17 +230,11 @@ Status FlushJob::Run(FileMetaData* file_meta) { return s; } -Status FlushJob::WriteLevel0Table(const autovector& mems, - VersionEdit* edit, FileMetaData* meta) { +Status FlushJob::WriteLevel0Table() { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_FLUSH_WRITE_L0); db_mutex_->AssertHeld(); const uint64_t start_micros = db_options_.env->NowMicros(); - // path 0 for level 0 file. - meta->fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); - - Version* base = cfd_->current(); - base->Ref(); // it is likely that we do not need this reference Status s; { db_mutex_->Unlock(); @@ -237,7 +247,7 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, Arena arena; uint64_t total_num_entries = 0, total_num_deletes = 0; size_t total_memory_usage = 0; - for (MemTable* m : mems) { + for (MemTable* m : mems_) { Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n", cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber()); @@ -249,7 +259,7 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, event_logger_->Log() << "job" << job_context_->job_id << "event" << "flush_started" - << "num_memtables" << mems.size() << "num_entries" + << "num_memtables" << mems_.size() << "num_entries" << total_num_entries << "num_deletes" << total_num_deletes << "memory_usage" << total_memory_usage; @@ -260,13 +270,13 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, static_cast(memtables.size()), &arena)); Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started", - cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber()); + cfd_->GetName().c_str(), job_context_->job_id, meta_.fd.GetNumber()); TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", &output_compression_); s = BuildTable( dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_, - env_options_, cfd_->table_cache(), iter.get(), meta, + env_options_, cfd_->table_cache(), iter.get(), &meta_, cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), cfd_->GetName(), existing_snapshots_, earliest_write_conflict_snapshot_, output_compression_, @@ -280,9 +290,9 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s" "%s", - cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber(), - meta->fd.GetFileSize(), s.ToString().c_str(), - meta->marked_for_compaction ? " (needs compaction)" : ""); + cfd_->GetName().c_str(), job_context_->job_id, meta_.fd.GetNumber(), + meta_.fd.GetFileSize(), s.ToString().c_str(), + meta_.marked_for_compaction ? " (needs compaction)" : ""); if (!db_options_.disableDataSync && output_file_directory_ != nullptr) { output_file_directory_->Fsync(); @@ -290,29 +300,29 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, TEST_SYNC_POINT("FlushJob::WriteLevel0Table"); db_mutex_->Lock(); } - base->Unref(); + base_->Unref(); // Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. - if (s.ok() && meta->fd.GetFileSize() > 0) { + if (s.ok() && meta_.fd.GetFileSize() > 0) { // if we have more than 1 background thread, then we cannot // insert files directly into higher levels because some other // threads could be concurrently producing compacted files for // that key range. // Add file to L0 - edit->AddFile(0 /* level */, meta->fd.GetNumber(), meta->fd.GetPathId(), - meta->fd.GetFileSize(), meta->smallest, meta->largest, - meta->smallest_seqno, meta->largest_seqno, - meta->marked_for_compaction); + edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(), + meta_.fd.GetFileSize(), meta_.smallest, meta_.largest, + meta_.smallest_seqno, meta_.largest_seqno, + meta_.marked_for_compaction); } // Note that here we treat flush as level 0 compaction in internal stats InternalStats::CompactionStats stats(1); stats.micros = db_options_.env->NowMicros() - start_micros; - stats.bytes_written = meta->fd.GetFileSize(); + stats.bytes_written = meta_.fd.GetFileSize(); cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats); cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, - meta->fd.GetFileSize()); + meta_.fd.GetFileSize()); RecordFlushIOStats(); return s; } diff --git a/db/flush_job.h b/db/flush_job.h index 8c7de4b4d..285a8c14f 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -66,6 +66,8 @@ class FlushJob { ~FlushJob(); + // Require db_mutex held + void PickMemTable(); Status Run(FileMetaData* file_meta = nullptr); TableProperties GetTableProperties() const { return table_properties_; } @@ -73,8 +75,7 @@ class FlushJob { void ReportStartedFlush(); void ReportFlushInputSize(const autovector& mems); void RecordFlushIOStats(); - Status WriteLevel0Table(const autovector& mems, VersionEdit* edit, - FileMetaData* meta); + Status WriteLevel0Table(); const std::string& dbname_; ColumnFamilyData* cfd_; const DBOptions& db_options_; @@ -94,6 +95,13 @@ class FlushJob { EventLogger* event_logger_; TableProperties table_properties_; bool measure_io_stats_; + + // Variables below are set by PickMemTable(): + FileMetaData meta_; + autovector mems_; + VersionEdit* edit_; + Version* base_; + bool pick_memtable_called; }; } // namespace rocksdb diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 062ab1d50..9648c01e4 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -94,7 +94,11 @@ TEST_F(FlushJobTest, Empty) { env_options_, versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, &job_context, nullptr, nullptr, nullptr, kNoCompression, nullptr, &event_logger, false); - ASSERT_OK(flush_job.Run()); + { + InstrumentedMutexLock l(&mutex_); + flush_job.PickMemTable(); + ASSERT_OK(flush_job.Run()); + } job_context.Clean(); } @@ -135,6 +139,7 @@ TEST_F(FlushJobTest, NonEmpty) { nullptr, kNoCompression, nullptr, &event_logger, true); FileMetaData fd; mutex_.Lock(); + flush_job.PickMemTable(); ASSERT_OK(flush_job.Run(&fd)); mutex_.Unlock(); ASSERT_EQ(ToString(0), fd.smallest.user_key().ToString()); @@ -198,6 +203,7 @@ TEST_F(FlushJobTest, Snapshots) { &shutting_down_, snapshots, kMaxSequenceNumber, &job_context, nullptr, nullptr, nullptr, kNoCompression, nullptr, &event_logger, true); mutex_.Lock(); + flush_job.PickMemTable(); ASSERT_OK(flush_job.Run()); mutex_.Unlock(); mock_table_factory_->AssertSingleFile(inserted_keys); diff --git a/db/log_writer.h b/db/log_writer.h index 23d896746..11267b33d 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -81,6 +81,8 @@ class Writer { WritableFileWriter* file() { return dest_.get(); } const WritableFileWriter* file() const { return dest_.get(); } + uint64_t get_log_number() const { return log_number_; } + private: unique_ptr dest_; size_t block_offset_; // Current offset in block