Need to make sure log file synced before flushing memtable of one column family

Summary: Multiput atomiciy is broken across multiple column families if we don't sync WAL before flushing one column family. The WAL file may contain a write batch containing writes to a key to the CF to be flushed and a key to other CF. If we don't sync WAL before flushing, if machine crashes after flushing, the write batch will only be partial recovered. Data to other CFs are lost.

Test Plan: Add a new unit test which will fail without the diff.

Reviewers: yhchiang, IslamAbdelRahman, igor, yiwu

Reviewed By: yiwu

Subscribers: yiwu, leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D60915
main
sdong 9 years ago
parent 89f319c2df
commit d5a51d4de3
  1. 1
      Makefile
  2. 33
      db/column_family_test.cc
  3. 65
      db/db_impl.cc
  4. 2
      db/db_impl.h
  5. 112
      db/flush_job.cc
  6. 12
      db/flush_job.h
  7. 8
      db/flush_job_test.cc
  8. 2
      db/log_writer.h

@ -384,6 +384,7 @@ PARALLEL_TEST = \
compact_on_deletion_collector_test \
db_compaction_filter_test \
db_compaction_test \
db_sst_test \
db_test \
db_universal_compaction_test \
fault_injection_test \

@ -18,6 +18,7 @@
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "util/coding.h"
#include "util/fault_injection_test_env.h"
#include "util/options_parser.h"
#include "util/string_util.h"
#include "util/sync_point.h"
@ -782,6 +783,38 @@ TEST_F(ColumnFamilyTest, LogDeletionTest) {
Close();
}
TEST_F(ColumnFamilyTest, CrashAfterFlush) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(env_));
db_options_.env = fault_env.get();
Open();
CreateColumnFamilies({"one"});
WriteBatch batch;
batch.Put(handles_[0], Slice("foo"), Slice("bar"));
batch.Put(handles_[1], Slice("foo"), Slice("bar"));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
Flush(0);
fault_env->SetFilesystemActive(false);
std::vector<std::string> names;
for (auto name : names_) {
if (name != "") {
names.push_back(name);
}
}
Close();
fault_env->DropUnsyncedFileData();
fault_env->ResetState();
Open(names, {});
// Write batch should be atomic.
ASSERT_EQ(Get(0, "foo"), Get(1, "foo"));
Close();
db_options_.env = env_;
}
// Makes sure that obsolete log files get deleted
TEST_F(ColumnFamilyTest, DifferentWriteBufferSizes) {
// disable flushing stale column families

@ -1771,6 +1771,49 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
return s;
}
Status DBImpl::SyncClosedLogs(JobContext* job_context) {
mutex_.AssertHeld();
autovector<log::Writer*, 1> logs_to_sync;
uint64_t current_log_number = logfile_number_;
while (logs_.front().number < current_log_number &&
logs_.front().getting_synced) {
log_sync_cv_.Wait();
}
for (auto it = logs_.begin();
it != logs_.end() && it->number < current_log_number; ++it) {
auto& log = *it;
assert(!log.getting_synced);
log.getting_synced = true;
logs_to_sync.push_back(log.writer);
}
Status s;
if (!logs_to_sync.empty()) {
mutex_.Unlock();
for (log::Writer* log : logs_to_sync) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
log->get_log_number());
s = log->file()->Sync(db_options_.use_fsync);
}
if (s.ok()) {
s = directories_.GetWalDir()->Fsync();
}
mutex_.Lock();
// "number <= current_log_number - 1" is equivalent to
// "number < current_log_number".
MarkLogsSynced(current_log_number - 1, true, s);
if (!s.ok()) {
bg_error_ = s;
return s;
}
}
return s;
}
Status DBImpl::FlushMemTableToOutputFile(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) {
@ -1792,13 +1835,30 @@ Status DBImpl::FlushMemTableToOutputFile(
FileMetaData file_meta;
flush_job.PickMemTable();
Status s;
if (logfile_number_ > 0 &&
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 0 &&
!db_options_.disableDataSync) {
// If there are more than one column families, we need to make sure that
// all the log files except the most recent one are synced. Otherwise if
// the host crashes after flushing and before WAL is persistent, the
// flushed SST may contain data from write batches whose updates to
// other column families are missing.
// SyncClosedLogs() may unlock and re-lock the db_mutex.
s = SyncClosedLogs(job_context);
}
// Within flush_job.Run, rocksdb may call event listener to notify
// file creation and deletion.
//
// Note that flush_job.Run will unlock and lock the db_mutex,
// and EventListener callback will be called when the db_mutex
// is unlocked by the current thread.
Status s = flush_job.Run(&file_meta);
if (s.ok()) {
s = flush_job.Run(&file_meta);
}
if (s.ok()) {
InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context,
@ -2549,12 +2609,11 @@ Status DBImpl::SyncWAL() {
}
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
{
InstrumentedMutexLock l(&mutex_);
MarkLogsSynced(current_log_number, need_log_dir_sync, status);
}
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
return status;
}

@ -601,6 +601,8 @@ class DBImpl : public DB {
// and blocked by any other pending_outputs_ calls)
void ReleaseFileNumberFromPendingOutputs(std::list<uint64_t>::iterator v);
Status SyncClosedLogs(JobContext* job_context);
// Flush the in-memory write buffer to storage. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
Status FlushMemTableToOutputFile(ColumnFamilyData* cfd,

@ -84,7 +84,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
output_compression_(output_compression),
stats_(stats),
event_logger_(event_logger),
measure_io_stats_(measure_io_stats) {
measure_io_stats_(measure_io_stats),
pick_memtable_called(false) {
// Update the thread status to indicate flush.
ReportStartedFlush();
TEST_SYNC_POINT("FlushJob::FlushJob()");
@ -121,9 +122,47 @@ void FlushJob::RecordFlushIOStats() {
IOSTATS_RESET(bytes_written);
}
void FlushJob::PickMemTable() {
db_mutex_->AssertHeld();
assert(!pick_memtable_called);
pick_memtable_called = true;
// Save the contents of the earliest memtable as a new Table
cfd_->imm()->PickMemtablesToFlush(&mems_);
if (mems_.empty()) {
return;
}
ReportFlushInputSize(mems_);
// entries mems are (implicitly) sorted in ascending order by their created
// time. We will use the first memtable's `edit` to keep the meta info for
// this flush.
MemTable* m = mems_[0];
edit_ = m->GetEdits();
edit_->SetPrevLogNumber(0);
// SetLogNumber(log_num) indicates logs with number smaller than log_num
// will no longer be picked up for recovery.
edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
edit_->SetColumnFamily(cfd_->GetID());
// path 0 for level 0 file.
meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
base_ = cfd_->current();
base_->Ref(); // it is likely that we do not need this reference
}
Status FlushJob::Run(FileMetaData* file_meta) {
db_mutex_->AssertHeld();
assert(pick_memtable_called);
AutoThreadOperationStageUpdater stage_run(
ThreadStatus::STAGE_FLUSH_RUN);
if (mems_.empty()) {
LogToBuffer(log_buffer_, "[%s] Nothing in memtable to flush",
cfd_->GetName().c_str());
return Status::OK();
}
// I/O measurement variables
PerfLevel prev_perf_level = PerfLevel::kEnableTime;
uint64_t prev_write_nanos = 0;
@ -139,31 +178,8 @@ Status FlushJob::Run(FileMetaData* file_meta) {
prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
}
// Save the contents of the earliest memtable as a new Table
FileMetaData meta;
autovector<MemTable*> mems;
cfd_->imm()->PickMemtablesToFlush(&mems);
if (mems.empty()) {
LogToBuffer(log_buffer_, "[%s] Nothing in memtable to flush",
cfd_->GetName().c_str());
return Status::OK();
}
ReportFlushInputSize(mems);
// entries mems are (implicitly) sorted in ascending order by their created
// time. We will use the first memtable's `edit` to keep the meta info for
// this flush.
MemTable* m = mems[0];
VersionEdit* edit = m->GetEdits();
edit->SetPrevLogNumber(0);
// SetLogNumber(log_num) indicates logs with number smaller than log_num
// will no longer be picked up for recovery.
edit->SetLogNumber(mems.back()->GetNextLogNumber());
edit->SetColumnFamily(cfd_->GetID());
// This will release and re-acquire the mutex.
Status s = WriteLevel0Table(mems, edit, &meta);
Status s = WriteLevel0Table();
if (s.ok() &&
(shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
@ -172,18 +188,18 @@ Status FlushJob::Run(FileMetaData* file_meta) {
}
if (!s.ok()) {
cfd_->imm()->RollbackMemtableFlush(mems, meta.fd.GetNumber());
cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
} else {
TEST_SYNC_POINT("FlushJob::InstallResults");
// Replace immutable memtable with the generated Table
s = cfd_->imm()->InstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems, versions_, db_mutex_,
meta.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
cfd_, mutable_cf_options_, mems_, versions_, db_mutex_,
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
log_buffer_);
}
if (s.ok() && file_meta != nullptr) {
*file_meta = meta;
*file_meta = meta_;
}
RecordFlushIOStats();
@ -214,17 +230,11 @@ Status FlushJob::Run(FileMetaData* file_meta) {
return s;
}
Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
VersionEdit* edit, FileMetaData* meta) {
Status FlushJob::WriteLevel0Table() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_FLUSH_WRITE_L0);
db_mutex_->AssertHeld();
const uint64_t start_micros = db_options_.env->NowMicros();
// path 0 for level 0 file.
meta->fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
Version* base = cfd_->current();
base->Ref(); // it is likely that we do not need this reference
Status s;
{
db_mutex_->Unlock();
@ -237,7 +247,7 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
Arena arena;
uint64_t total_num_entries = 0, total_num_deletes = 0;
size_t total_memory_usage = 0;
for (MemTable* m : mems) {
for (MemTable* m : mems_) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
@ -249,7 +259,7 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
event_logger_->Log() << "job" << job_context_->job_id << "event"
<< "flush_started"
<< "num_memtables" << mems.size() << "num_entries"
<< "num_memtables" << mems_.size() << "num_entries"
<< total_num_entries << "num_deletes"
<< total_num_deletes << "memory_usage"
<< total_memory_usage;
@ -260,13 +270,13 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
static_cast<int>(memtables.size()), &arena));
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber());
cfd_->GetName().c_str(), job_context_->job_id, meta_.fd.GetNumber());
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
&output_compression_);
s = BuildTable(
dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
env_options_, cfd_->table_cache(), iter.get(), meta,
env_options_, cfd_->table_cache(), iter.get(), &meta_,
cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(),
cfd_->GetID(), cfd_->GetName(), existing_snapshots_,
earliest_write_conflict_snapshot_, output_compression_,
@ -280,9 +290,9 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
" bytes %s"
"%s",
cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber(),
meta->fd.GetFileSize(), s.ToString().c_str(),
meta->marked_for_compaction ? " (needs compaction)" : "");
cfd_->GetName().c_str(), job_context_->job_id, meta_.fd.GetNumber(),
meta_.fd.GetFileSize(), s.ToString().c_str(),
meta_.marked_for_compaction ? " (needs compaction)" : "");
if (!db_options_.disableDataSync && output_file_directory_ != nullptr) {
output_file_directory_->Fsync();
@ -290,29 +300,29 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
db_mutex_->Lock();
}
base->Unref();
base_->Unref();
// Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest.
if (s.ok() && meta->fd.GetFileSize() > 0) {
if (s.ok() && meta_.fd.GetFileSize() > 0) {
// if we have more than 1 background thread, then we cannot
// insert files directly into higher levels because some other
// threads could be concurrently producing compacted files for
// that key range.
// Add file to L0
edit->AddFile(0 /* level */, meta->fd.GetNumber(), meta->fd.GetPathId(),
meta->fd.GetFileSize(), meta->smallest, meta->largest,
meta->smallest_seqno, meta->largest_seqno,
meta->marked_for_compaction);
edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
meta_.smallest_seqno, meta_.largest_seqno,
meta_.marked_for_compaction);
}
// Note that here we treat flush as level 0 compaction in internal stats
InternalStats::CompactionStats stats(1);
stats.micros = db_options_.env->NowMicros() - start_micros;
stats.bytes_written = meta->fd.GetFileSize();
stats.bytes_written = meta_.fd.GetFileSize();
cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
meta->fd.GetFileSize());
meta_.fd.GetFileSize());
RecordFlushIOStats();
return s;
}

@ -66,6 +66,8 @@ class FlushJob {
~FlushJob();
// Require db_mutex held
void PickMemTable();
Status Run(FileMetaData* file_meta = nullptr);
TableProperties GetTableProperties() const { return table_properties_; }
@ -73,8 +75,7 @@ class FlushJob {
void ReportStartedFlush();
void ReportFlushInputSize(const autovector<MemTable*>& mems);
void RecordFlushIOStats();
Status WriteLevel0Table(const autovector<MemTable*>& mems, VersionEdit* edit,
FileMetaData* meta);
Status WriteLevel0Table();
const std::string& dbname_;
ColumnFamilyData* cfd_;
const DBOptions& db_options_;
@ -94,6 +95,13 @@ class FlushJob {
EventLogger* event_logger_;
TableProperties table_properties_;
bool measure_io_stats_;
// Variables below are set by PickMemTable():
FileMetaData meta_;
autovector<MemTable*> mems_;
VersionEdit* edit_;
Version* base_;
bool pick_memtable_called;
};
} // namespace rocksdb

@ -94,7 +94,11 @@ TEST_F(FlushJobTest, Empty) {
env_options_, versions_.get(), &mutex_, &shutting_down_,
{}, kMaxSequenceNumber, &job_context, nullptr, nullptr,
nullptr, kNoCompression, nullptr, &event_logger, false);
ASSERT_OK(flush_job.Run());
{
InstrumentedMutexLock l(&mutex_);
flush_job.PickMemTable();
ASSERT_OK(flush_job.Run());
}
job_context.Clean();
}
@ -135,6 +139,7 @@ TEST_F(FlushJobTest, NonEmpty) {
nullptr, kNoCompression, nullptr, &event_logger, true);
FileMetaData fd;
mutex_.Lock();
flush_job.PickMemTable();
ASSERT_OK(flush_job.Run(&fd));
mutex_.Unlock();
ASSERT_EQ(ToString(0), fd.smallest.user_key().ToString());
@ -198,6 +203,7 @@ TEST_F(FlushJobTest, Snapshots) {
&shutting_down_, snapshots, kMaxSequenceNumber, &job_context, nullptr,
nullptr, nullptr, kNoCompression, nullptr, &event_logger, true);
mutex_.Lock();
flush_job.PickMemTable();
ASSERT_OK(flush_job.Run());
mutex_.Unlock();
mock_table_factory_->AssertSingleFile(inserted_keys);

@ -81,6 +81,8 @@ class Writer {
WritableFileWriter* file() { return dest_.get(); }
const WritableFileWriter* file() const { return dest_.get(); }
uint64_t get_log_number() const { return log_number_; }
private:
unique_ptr<WritableFileWriter> dest_;
size_t block_offset_; // Current offset in block

Loading…
Cancel
Save