From d5a51d4de3cb71aee0ee4d5578fa9927189fcad2 Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 19 Jul 2016 15:12:46 -0700 Subject: [PATCH] Need to make sure log file synced before flushing memtable of one column family Summary: Multiput atomiciy is broken across multiple column families if we don't sync WAL before flushing one column family. The WAL file may contain a write batch containing writes to a key to the CF to be flushed and a key to other CF. If we don't sync WAL before flushing, if machine crashes after flushing, the write batch will only be partial recovered. Data to other CFs are lost. Test Plan: Add a new unit test which will fail without the diff. Reviewers: yhchiang, IslamAbdelRahman, igor, yiwu Reviewed By: yiwu Subscribers: yiwu, leveldb, andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D60915 --- Makefile | 1 + db/column_family_test.cc | 33 ++++++++++++ db/db_impl.cc | 65 +++++++++++++++++++++-- db/db_impl.h | 2 + db/flush_job.cc | 112 +++++++++++++++++++++------------------ db/flush_job.h | 12 ++++- db/flush_job_test.cc | 8 ++- db/log_writer.h | 2 + 8 files changed, 178 insertions(+), 57 deletions(-) diff --git a/Makefile b/Makefile index c7e5fcb3a..dad0c4fbe 100644 --- a/Makefile +++ b/Makefile @@ -384,6 +384,7 @@ PARALLEL_TEST = \ compact_on_deletion_collector_test \ db_compaction_filter_test \ db_compaction_test \ + db_sst_test \ db_test \ db_universal_compaction_test \ fault_injection_test \ diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 24730bc11..bbdc7cb12 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -18,6 +18,7 @@ #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "util/coding.h" +#include "util/fault_injection_test_env.h" #include "util/options_parser.h" #include "util/string_util.h" #include "util/sync_point.h" @@ -782,6 +783,38 @@ TEST_F(ColumnFamilyTest, LogDeletionTest) { Close(); } +TEST_F(ColumnFamilyTest, CrashAfterFlush) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(env_)); + db_options_.env = fault_env.get(); + Open(); + CreateColumnFamilies({"one"}); + + WriteBatch batch; + batch.Put(handles_[0], Slice("foo"), Slice("bar")); + batch.Put(handles_[1], Slice("foo"), Slice("bar")); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + Flush(0); + fault_env->SetFilesystemActive(false); + + std::vector names; + for (auto name : names_) { + if (name != "") { + names.push_back(name); + } + } + Close(); + fault_env->DropUnsyncedFileData(); + fault_env->ResetState(); + Open(names, {}); + + // Write batch should be atomic. + ASSERT_EQ(Get(0, "foo"), Get(1, "foo")); + + Close(); + db_options_.env = env_; +} + // Makes sure that obsolete log files get deleted TEST_F(ColumnFamilyTest, DifferentWriteBufferSizes) { // disable flushing stale column families diff --git a/db/db_impl.cc b/db/db_impl.cc index a445db805..1175b8d33 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1771,6 +1771,49 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, return s; } +Status DBImpl::SyncClosedLogs(JobContext* job_context) { + mutex_.AssertHeld(); + autovector logs_to_sync; + uint64_t current_log_number = logfile_number_; + while (logs_.front().number < current_log_number && + logs_.front().getting_synced) { + log_sync_cv_.Wait(); + } + for (auto it = logs_.begin(); + it != logs_.end() && it->number < current_log_number; ++it) { + auto& log = *it; + assert(!log.getting_synced); + log.getting_synced = true; + logs_to_sync.push_back(log.writer); + } + + Status s; + if (!logs_to_sync.empty()) { + mutex_.Unlock(); + + for (log::Writer* log : logs_to_sync) { + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "[JOB %d] Syncing log #%" PRIu64, job_context->job_id, + log->get_log_number()); + s = log->file()->Sync(db_options_.use_fsync); + } + if (s.ok()) { + s = directories_.GetWalDir()->Fsync(); + } + + mutex_.Lock(); + + // "number <= current_log_number - 1" is equivalent to + // "number < current_log_number". + MarkLogsSynced(current_log_number - 1, true, s); + if (!s.ok()) { + bg_error_ = s; + return s; + } + } + return s; +} + Status DBImpl::FlushMemTableToOutputFile( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) { @@ -1792,13 +1835,30 @@ Status DBImpl::FlushMemTableToOutputFile( FileMetaData file_meta; + flush_job.PickMemTable(); + + Status s; + if (logfile_number_ > 0 && + versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 0 && + !db_options_.disableDataSync) { + // If there are more than one column families, we need to make sure that + // all the log files except the most recent one are synced. Otherwise if + // the host crashes after flushing and before WAL is persistent, the + // flushed SST may contain data from write batches whose updates to + // other column families are missing. + // SyncClosedLogs() may unlock and re-lock the db_mutex. + s = SyncClosedLogs(job_context); + } + // Within flush_job.Run, rocksdb may call event listener to notify // file creation and deletion. // // Note that flush_job.Run will unlock and lock the db_mutex, // and EventListener callback will be called when the db_mutex // is unlocked by the current thread. - Status s = flush_job.Run(&file_meta); + if (s.ok()) { + s = flush_job.Run(&file_meta); + } if (s.ok()) { InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context, @@ -2549,12 +2609,11 @@ Status DBImpl::SyncWAL() { } TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1"); - TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2"); - { InstrumentedMutexLock l(&mutex_); MarkLogsSynced(current_log_number, need_log_dir_sync, status); } + TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2"); return status; } diff --git a/db/db_impl.h b/db/db_impl.h index add0d751d..ab4cbc0c0 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -601,6 +601,8 @@ class DBImpl : public DB { // and blocked by any other pending_outputs_ calls) void ReleaseFileNumberFromPendingOutputs(std::list::iterator v); + Status SyncClosedLogs(JobContext* job_context); + // Flush the in-memory write buffer to storage. Switches to a new // log-file/memtable and writes a new descriptor iff successful. Status FlushMemTableToOutputFile(ColumnFamilyData* cfd, diff --git a/db/flush_job.cc b/db/flush_job.cc index 8db50a41a..bb38f485d 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -84,7 +84,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, output_compression_(output_compression), stats_(stats), event_logger_(event_logger), - measure_io_stats_(measure_io_stats) { + measure_io_stats_(measure_io_stats), + pick_memtable_called(false) { // Update the thread status to indicate flush. ReportStartedFlush(); TEST_SYNC_POINT("FlushJob::FlushJob()"); @@ -121,9 +122,47 @@ void FlushJob::RecordFlushIOStats() { IOSTATS_RESET(bytes_written); } +void FlushJob::PickMemTable() { + db_mutex_->AssertHeld(); + assert(!pick_memtable_called); + pick_memtable_called = true; + // Save the contents of the earliest memtable as a new Table + cfd_->imm()->PickMemtablesToFlush(&mems_); + if (mems_.empty()) { + return; + } + + ReportFlushInputSize(mems_); + + // entries mems are (implicitly) sorted in ascending order by their created + // time. We will use the first memtable's `edit` to keep the meta info for + // this flush. + MemTable* m = mems_[0]; + edit_ = m->GetEdits(); + edit_->SetPrevLogNumber(0); + // SetLogNumber(log_num) indicates logs with number smaller than log_num + // will no longer be picked up for recovery. + edit_->SetLogNumber(mems_.back()->GetNextLogNumber()); + edit_->SetColumnFamily(cfd_->GetID()); + + // path 0 for level 0 file. + meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); + + base_ = cfd_->current(); + base_->Ref(); // it is likely that we do not need this reference +} + Status FlushJob::Run(FileMetaData* file_meta) { + db_mutex_->AssertHeld(); + assert(pick_memtable_called); AutoThreadOperationStageUpdater stage_run( ThreadStatus::STAGE_FLUSH_RUN); + if (mems_.empty()) { + LogToBuffer(log_buffer_, "[%s] Nothing in memtable to flush", + cfd_->GetName().c_str()); + return Status::OK(); + } + // I/O measurement variables PerfLevel prev_perf_level = PerfLevel::kEnableTime; uint64_t prev_write_nanos = 0; @@ -139,31 +178,8 @@ Status FlushJob::Run(FileMetaData* file_meta) { prev_prepare_write_nanos = IOSTATS(prepare_write_nanos); } - // Save the contents of the earliest memtable as a new Table - FileMetaData meta; - autovector mems; - cfd_->imm()->PickMemtablesToFlush(&mems); - if (mems.empty()) { - LogToBuffer(log_buffer_, "[%s] Nothing in memtable to flush", - cfd_->GetName().c_str()); - return Status::OK(); - } - - ReportFlushInputSize(mems); - - // entries mems are (implicitly) sorted in ascending order by their created - // time. We will use the first memtable's `edit` to keep the meta info for - // this flush. - MemTable* m = mems[0]; - VersionEdit* edit = m->GetEdits(); - edit->SetPrevLogNumber(0); - // SetLogNumber(log_num) indicates logs with number smaller than log_num - // will no longer be picked up for recovery. - edit->SetLogNumber(mems.back()->GetNextLogNumber()); - edit->SetColumnFamily(cfd_->GetID()); - // This will release and re-acquire the mutex. - Status s = WriteLevel0Table(mems, edit, &meta); + Status s = WriteLevel0Table(); if (s.ok() && (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) { @@ -172,18 +188,18 @@ Status FlushJob::Run(FileMetaData* file_meta) { } if (!s.ok()) { - cfd_->imm()->RollbackMemtableFlush(mems, meta.fd.GetNumber()); + cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber()); } else { TEST_SYNC_POINT("FlushJob::InstallResults"); // Replace immutable memtable with the generated Table s = cfd_->imm()->InstallMemtableFlushResults( - cfd_, mutable_cf_options_, mems, versions_, db_mutex_, - meta.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, + cfd_, mutable_cf_options_, mems_, versions_, db_mutex_, + meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, log_buffer_); } if (s.ok() && file_meta != nullptr) { - *file_meta = meta; + *file_meta = meta_; } RecordFlushIOStats(); @@ -214,17 +230,11 @@ Status FlushJob::Run(FileMetaData* file_meta) { return s; } -Status FlushJob::WriteLevel0Table(const autovector& mems, - VersionEdit* edit, FileMetaData* meta) { +Status FlushJob::WriteLevel0Table() { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_FLUSH_WRITE_L0); db_mutex_->AssertHeld(); const uint64_t start_micros = db_options_.env->NowMicros(); - // path 0 for level 0 file. - meta->fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); - - Version* base = cfd_->current(); - base->Ref(); // it is likely that we do not need this reference Status s; { db_mutex_->Unlock(); @@ -237,7 +247,7 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, Arena arena; uint64_t total_num_entries = 0, total_num_deletes = 0; size_t total_memory_usage = 0; - for (MemTable* m : mems) { + for (MemTable* m : mems_) { Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n", cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber()); @@ -249,7 +259,7 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, event_logger_->Log() << "job" << job_context_->job_id << "event" << "flush_started" - << "num_memtables" << mems.size() << "num_entries" + << "num_memtables" << mems_.size() << "num_entries" << total_num_entries << "num_deletes" << total_num_deletes << "memory_usage" << total_memory_usage; @@ -260,13 +270,13 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, static_cast(memtables.size()), &arena)); Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started", - cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber()); + cfd_->GetName().c_str(), job_context_->job_id, meta_.fd.GetNumber()); TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", &output_compression_); s = BuildTable( dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_, - env_options_, cfd_->table_cache(), iter.get(), meta, + env_options_, cfd_->table_cache(), iter.get(), &meta_, cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), cfd_->GetName(), existing_snapshots_, earliest_write_conflict_snapshot_, output_compression_, @@ -280,9 +290,9 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s" "%s", - cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber(), - meta->fd.GetFileSize(), s.ToString().c_str(), - meta->marked_for_compaction ? " (needs compaction)" : ""); + cfd_->GetName().c_str(), job_context_->job_id, meta_.fd.GetNumber(), + meta_.fd.GetFileSize(), s.ToString().c_str(), + meta_.marked_for_compaction ? " (needs compaction)" : ""); if (!db_options_.disableDataSync && output_file_directory_ != nullptr) { output_file_directory_->Fsync(); @@ -290,29 +300,29 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, TEST_SYNC_POINT("FlushJob::WriteLevel0Table"); db_mutex_->Lock(); } - base->Unref(); + base_->Unref(); // Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. - if (s.ok() && meta->fd.GetFileSize() > 0) { + if (s.ok() && meta_.fd.GetFileSize() > 0) { // if we have more than 1 background thread, then we cannot // insert files directly into higher levels because some other // threads could be concurrently producing compacted files for // that key range. // Add file to L0 - edit->AddFile(0 /* level */, meta->fd.GetNumber(), meta->fd.GetPathId(), - meta->fd.GetFileSize(), meta->smallest, meta->largest, - meta->smallest_seqno, meta->largest_seqno, - meta->marked_for_compaction); + edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(), + meta_.fd.GetFileSize(), meta_.smallest, meta_.largest, + meta_.smallest_seqno, meta_.largest_seqno, + meta_.marked_for_compaction); } // Note that here we treat flush as level 0 compaction in internal stats InternalStats::CompactionStats stats(1); stats.micros = db_options_.env->NowMicros() - start_micros; - stats.bytes_written = meta->fd.GetFileSize(); + stats.bytes_written = meta_.fd.GetFileSize(); cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats); cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, - meta->fd.GetFileSize()); + meta_.fd.GetFileSize()); RecordFlushIOStats(); return s; } diff --git a/db/flush_job.h b/db/flush_job.h index 8c7de4b4d..285a8c14f 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -66,6 +66,8 @@ class FlushJob { ~FlushJob(); + // Require db_mutex held + void PickMemTable(); Status Run(FileMetaData* file_meta = nullptr); TableProperties GetTableProperties() const { return table_properties_; } @@ -73,8 +75,7 @@ class FlushJob { void ReportStartedFlush(); void ReportFlushInputSize(const autovector& mems); void RecordFlushIOStats(); - Status WriteLevel0Table(const autovector& mems, VersionEdit* edit, - FileMetaData* meta); + Status WriteLevel0Table(); const std::string& dbname_; ColumnFamilyData* cfd_; const DBOptions& db_options_; @@ -94,6 +95,13 @@ class FlushJob { EventLogger* event_logger_; TableProperties table_properties_; bool measure_io_stats_; + + // Variables below are set by PickMemTable(): + FileMetaData meta_; + autovector mems_; + VersionEdit* edit_; + Version* base_; + bool pick_memtable_called; }; } // namespace rocksdb diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 062ab1d50..9648c01e4 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -94,7 +94,11 @@ TEST_F(FlushJobTest, Empty) { env_options_, versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, &job_context, nullptr, nullptr, nullptr, kNoCompression, nullptr, &event_logger, false); - ASSERT_OK(flush_job.Run()); + { + InstrumentedMutexLock l(&mutex_); + flush_job.PickMemTable(); + ASSERT_OK(flush_job.Run()); + } job_context.Clean(); } @@ -135,6 +139,7 @@ TEST_F(FlushJobTest, NonEmpty) { nullptr, kNoCompression, nullptr, &event_logger, true); FileMetaData fd; mutex_.Lock(); + flush_job.PickMemTable(); ASSERT_OK(flush_job.Run(&fd)); mutex_.Unlock(); ASSERT_EQ(ToString(0), fd.smallest.user_key().ToString()); @@ -198,6 +203,7 @@ TEST_F(FlushJobTest, Snapshots) { &shutting_down_, snapshots, kMaxSequenceNumber, &job_context, nullptr, nullptr, nullptr, kNoCompression, nullptr, &event_logger, true); mutex_.Lock(); + flush_job.PickMemTable(); ASSERT_OK(flush_job.Run()); mutex_.Unlock(); mock_table_factory_->AssertSingleFile(inserted_keys); diff --git a/db/log_writer.h b/db/log_writer.h index 23d896746..11267b33d 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -81,6 +81,8 @@ class Writer { WritableFileWriter* file() { return dest_.get(); } const WritableFileWriter* file() const { return dest_.get(); } + uint64_t get_log_number() const { return log_number_; } + private: unique_ptr dest_; size_t block_offset_; // Current offset in block