diff --git a/db/column_family_test.cc b/db/column_family_test.cc index bbdc7cb12..c3f273739 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -501,6 +501,135 @@ TEST_F(ColumnFamilyTest, DontReuseColumnFamilyID) { } } +class FlushEmptyCFTestWithParam : public ColumnFamilyTest, + public testing::WithParamInterface { + public: + FlushEmptyCFTestWithParam() { allow_2pc_ = GetParam(); } + + // Required if inheriting from testing::WithParamInterface<> + static void SetUpTestCase() {} + static void TearDownTestCase() {} + + bool allow_2pc_; +}; + +TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(env_)); + db_options_.env = fault_env.get(); + db_options_.allow_2pc = allow_2pc_; + Open(); + CreateColumnFamilies({"one", "two"}); + // Generate log file A. + ASSERT_OK(Put(1, "foo", "v1")); // seqID 1 + + Reopen(); + // Log file A is not dropped after reopening because default column family's + // min log number is 0. + // It flushes to SST file X + ASSERT_OK(Put(1, "foo", "v1")); // seqID 2 + ASSERT_OK(Put(1, "bar", "v2")); // seqID 3 + // Current log file is file B now. While flushing, a new log file C is created + // and is set to current. Boths' min log number is set to file C in memory, so + // after flushing file B is deleted. At the same time, the min log number of + // default CF is not written to manifest. Log file A still remains. + // Flushed to SST file Y. + Flush(1); + Flush(0); + ASSERT_OK(Put(1, "bar", "v3")); // seqID 4 + ASSERT_OK(Put(1, "foo", "v4")); // seqID 5 + + // Preserve file system state up to here to simulate a crash condition. + fault_env->SetFilesystemActive(false); + std::vector names; + for (auto name : names_) { + if (name != "") { + names.push_back(name); + } + } + + Close(); + fault_env->ResetState(); + + // Before opening, there are four files: + // Log file A contains seqID 1 + // Log file C contains seqID 4, 5 + // SST file X contains seqID 1 + // SST file Y contains seqID 2, 3 + // Min log number: + // default CF: 0 + // CF one, two: C + // When opening the DB, all the seqID should be preserved. + Open(names, {}); + ASSERT_EQ("v4", Get(1, "foo")); + ASSERT_EQ("v3", Get(1, "bar")); + Close(); + + db_options_.env = env_; +} + +TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest2) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(env_)); + db_options_.env = fault_env.get(); + db_options_.allow_2pc = allow_2pc_; + Open(); + CreateColumnFamilies({"one", "two"}); + // Generate log file A. + ASSERT_OK(Put(1, "foo", "v1")); // seqID 1 + + Reopen(); + // Log file A is not dropped after reopening because default column family's + // min log number is 0. + // It flushes to SST file X + ASSERT_OK(Put(1, "foo", "v1")); // seqID 2 + ASSERT_OK(Put(1, "bar", "v2")); // seqID 3 + // Current log file is file B now. While flushing, a new log file C is created + // and is set to current. Both CFs' min log number is set to file C so after + // flushing file B is deleted. Log file A still remains. + // Flushed to SST file Y. + Flush(1); + ASSERT_OK(Put(0, "bar", "v2")); // seqID 4 + ASSERT_OK(Put(2, "bar", "v2")); // seqID 5 + ASSERT_OK(Put(1, "bar", "v3")); // seqID 6 + // Flushing all column families. This forces all CFs' min log to current. This + // is written to the manifest file. Log file C is cleared. + Flush(0); + Flush(1); + Flush(2); + // Write to log file D + ASSERT_OK(Put(1, "bar", "v4")); // seqID 7 + ASSERT_OK(Put(1, "bar", "v5")); // seqID 8 + // Preserve file system state up to here to simulate a crash condition. + fault_env->SetFilesystemActive(false); + std::vector names; + for (auto name : names_) { + if (name != "") { + names.push_back(name); + } + } + + Close(); + fault_env->ResetState(); + // Before opening, there are two logfiles: + // Log file A contains seqID 1 + // Log file D contains seqID 7, 8 + // Min log number: + // default CF: D + // CF one, two: D + // When opening the DB, log file D should be replayed using the seqID + // specified in the file. + Open(names, {}); + ASSERT_EQ("v1", Get(1, "foo")); + ASSERT_EQ("v5", Get(1, "bar")); + Close(); + + db_options_.env = env_; +} + +INSTANTIATE_TEST_CASE_P(FlushEmptyCFTestWithParam, FlushEmptyCFTestWithParam, + ::testing::Bool()); + TEST_F(ColumnFamilyTest, AddDrop) { Open(); CreateColumnFamilies({"one", "two", "three"}); diff --git a/db/db_impl.cc b/db/db_impl.cc index 1175b8d33..30bd55368 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1478,9 +1478,11 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, } recovered_sequence = sequence; + bool no_prev_seq = true; if (*next_sequence == kMaxSequenceNumber) { *next_sequence = sequence; } else { + no_prev_seq = false; WriteBatchInternal::SetSequence(&batch, *next_sequence); } @@ -1563,10 +1565,18 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // insert. We don't want to fail the whole write batch in that case -- // we just ignore the update. // That's why we set ignore missing column families to true + bool has_valid_writes = false; status = WriteBatchInternal::InsertInto( &batch, column_family_memtables_.get(), &flush_scheduler_, true, log_number, this, false /* concurrent_memtable_writes */, - next_sequence); + next_sequence, &has_valid_writes); + // If it is the first log file and there is no column family updated + // after replaying the file, this file may be a stale file. We ignore + // sequence IDs from the file. Otherwise, if a newer stale log file that + // has been deleted, the sequenceID may be wrong. + if (no_prev_seq && !has_valid_writes) { + *next_sequence = kMaxSequenceNumber; + } MaybeIgnoreError(&status); if (!status.ok()) { // We are treating this as a failure while reading since we read valid @@ -1575,7 +1585,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, continue; } - if (!read_only) { + if (has_valid_writes && !read_only) { // we can do this because this is called before client has access to the // DB and there is only a single thread operating on DB ColumnFamilyData* cfd; diff --git a/db/write_batch.cc b/db/write_batch.cc index bd81f6522..61c2eb6f4 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -694,6 +694,7 @@ class MemTableInserter : public WriteBatch::Handler { uint64_t log_number_ref_; DBImpl* db_; const bool concurrent_memtable_writes_; + bool* has_valid_writes_; typedef std::map MemPostInfoMap; MemPostInfoMap mem_post_info_map_; // current recovered transaction we are rebuilding (recovery) @@ -704,7 +705,8 @@ class MemTableInserter : public WriteBatch::Handler { FlushScheduler* flush_scheduler, bool ignore_missing_column_families, uint64_t recovering_log_number, DB* db, - bool concurrent_memtable_writes) + bool concurrent_memtable_writes, + bool* has_valid_writes = nullptr) : sequence_(sequence), cf_mems_(cf_mems), flush_scheduler_(flush_scheduler), @@ -713,6 +715,7 @@ class MemTableInserter : public WriteBatch::Handler { log_number_ref_(0), db_(reinterpret_cast(db)), concurrent_memtable_writes_(concurrent_memtable_writes), + has_valid_writes_(has_valid_writes), rebuilding_trx_(nullptr) { assert(cf_mems_); } @@ -756,6 +759,10 @@ class MemTableInserter : public WriteBatch::Handler { return false; } + if (has_valid_writes_ != nullptr) { + *has_valid_writes_ = true; + } + if (log_number_ref_ > 0) { cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_); } @@ -976,6 +983,9 @@ class MemTableInserter : public WriteBatch::Handler { // we are now iterating through a prepared section rebuilding_trx_ = new WriteBatch(); + if (has_valid_writes_ != nullptr) { + *has_valid_writes_ = true; + } } else { // in non-recovery we ignore prepare markers // and insert the values directly. making sure we have a @@ -1029,6 +1039,9 @@ class MemTableInserter : public WriteBatch::Handler { if (s.ok()) { db_->DeleteRecoveredTransaction(name.ToString()); } + if (has_valid_writes_ != nullptr) { + *has_valid_writes_ = true; + } } } else { // in non recovery we simply ignore this tag @@ -1112,16 +1125,15 @@ Status WriteBatchInternal::InsertInto(WriteThread::Writer* writer, return s; } -Status WriteBatchInternal::InsertInto(const WriteBatch* batch, - ColumnFamilyMemTables* memtables, - FlushScheduler* flush_scheduler, - bool ignore_missing_column_families, - uint64_t log_number, DB* db, - bool concurrent_memtable_writes, - SequenceNumber* last_seq_used) { +Status WriteBatchInternal::InsertInto( + const WriteBatch* batch, ColumnFamilyMemTables* memtables, + FlushScheduler* flush_scheduler, bool ignore_missing_column_families, + uint64_t log_number, DB* db, bool concurrent_memtable_writes, + SequenceNumber* last_seq_used, bool* has_valid_writes) { MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables, flush_scheduler, ignore_missing_column_families, - log_number, db, concurrent_memtable_writes); + log_number, db, concurrent_memtable_writes, + has_valid_writes); Status s = batch->Iterate(&inserter); if (last_seq_used != nullptr) { *last_seq_used = inserter.get_final_sequence(); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index b259440bc..cc036cbf6 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -160,7 +160,8 @@ class WriteBatchInternal { bool ignore_missing_column_families = false, uint64_t log_number = 0, DB* db = nullptr, bool concurrent_memtable_writes = false, - SequenceNumber* last_seq_used = nullptr); + SequenceNumber* last_seq_used = nullptr, + bool* has_valid_writes = nullptr); static Status InsertInto(WriteThread::Writer* writer, ColumnFamilyMemTables* memtables,