diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index b05694017..d4ae5f2f3 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -198,7 +198,7 @@ class CompactionJobTest : public testing::Test { unique_ptr file_writer( new WritableFileWriter(std::move(file), env_options_)); { - log::Writer log(std::move(file_writer)); + log::Writer log(std::move(file_writer), 0, false); std::string record; new_db.EncodeTo(&record); s = log.AddRecord(record); diff --git a/db/db_impl.cc b/db/db_impl.cc index 67885f73a..402df5316 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -420,7 +420,7 @@ Status DBImpl::NewDB() { file->SetPreallocationBlockSize(db_options_.manifest_preallocation_size); unique_ptr file_writer( new WritableFileWriter(std::move(file), env_options)); - log::Writer log(std::move(file_writer)); + log::Writer log(std::move(file_writer), 0, false); std::string record; new_db.EncodeTo(&record); s = log.AddRecord(record); @@ -4117,7 +4117,9 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { mutable_cf_options.write_buffer_size); unique_ptr file_writer( new WritableFileWriter(std::move(lfile), opt_env_opt)); - new_log = new log::Writer(std::move(file_writer)); + new_log = new log::Writer(std::move(file_writer), + new_log_number, + db_options_.recycle_log_file_num > 0); } } @@ -4756,8 +4758,11 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, impl->logfile_number_ = new_log_number; unique_ptr file_writer( new WritableFileWriter(std::move(lfile), opt_env_options)); - impl->logs_.emplace_back(new_log_number, - new log::Writer(std::move(file_writer))); + impl->logs_.emplace_back( + new_log_number, + new log::Writer(std::move(file_writer), + new_log_number, + impl->db_options_.recycle_log_file_num > 0)); // set column family handles for (auto cf : column_families) { diff --git a/db/db_test.cc b/db/db_test.cc index 83db98c55..03d4e485a 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5074,7 +5074,9 @@ class RecoveryTestHelper { ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options)); unique_ptr file_writer( new WritableFileWriter(std::move(file), env_options)); - current_log_writer.reset(new log::Writer(std::move(file_writer))); + current_log_writer.reset(new log::Writer( + std::move(file_writer), current_log_number, + db_options.recycle_log_file_num > 0)); for (int i = 0; i < kKeysPerWALFile; i++) { std::string key = "key" + ToString(count++); diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index d2c423c36..d20cf5e90 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -61,7 +61,7 @@ class FlushJobTest : public testing::Test { unique_ptr file_writer( new WritableFileWriter(std::move(file), EnvOptions())); { - log::Writer log(std::move(file_writer)); + log::Writer log(std::move(file_writer), 0, false); std::string record; new_db.EncodeTo(&record); s = log.AddRecord(record); diff --git a/db/log_test.cc b/db/log_test.cc index 5ab41f251..55ecd5f3f 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -43,7 +43,7 @@ static std::string RandomSkewedString(int i, Random* rnd) { return BigString(NumberString(i), rnd->Skewed(17)); } -class LogTest : public testing::Test { +class LogTest : public ::testing::TestWithParam { private: class StringSource : public SequentialFile { public: @@ -158,12 +158,11 @@ class LogTest : public testing::Test { public: LogTest() : reader_contents_(), - dest_holder_( - test::GetWritableFileWriter( - new test::StringSink(&reader_contents_))), + dest_holder_(test::GetWritableFileWriter( + new test::StringSink(&reader_contents_))), source_holder_( test::GetSequentialFileReader(new StringSource(reader_contents_))), - writer_(std::move(dest_holder_)), + writer_(std::move(dest_holder_), 123, GetParam()), reader_(std::move(source_holder_), &report_, true /*checksum*/, 0 /*initial_offset*/) {} @@ -298,9 +297,9 @@ uint64_t LogTest::initial_offset_last_record_offsets_[] = 2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize}; -TEST_F(LogTest, Empty) { ASSERT_EQ("EOF", Read()); } +TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); } -TEST_F(LogTest, ReadWrite) { +TEST_P(LogTest, ReadWrite) { Write("foo"); Write("bar"); Write(""); @@ -313,7 +312,7 @@ TEST_F(LogTest, ReadWrite) { ASSERT_EQ("EOF", Read()); // Make sure reads at eof work } -TEST_F(LogTest, ManyBlocks) { +TEST_P(LogTest, ManyBlocks) { for (int i = 0; i < 100000; i++) { Write(NumberString(i)); } @@ -323,7 +322,7 @@ TEST_F(LogTest, ManyBlocks) { ASSERT_EQ("EOF", Read()); } -TEST_F(LogTest, Fragmentation) { +TEST_P(LogTest, Fragmentation) { Write("small"); Write(BigString("medium", 50000)); Write(BigString("large", 100000)); @@ -333,7 +332,7 @@ TEST_F(LogTest, Fragmentation) { ASSERT_EQ("EOF", Read()); } -TEST_F(LogTest, MarginalTrailer) { +TEST_P(LogTest, MarginalTrailer) { // Make a trailer that is exactly the same length as an empty record. const int n = kBlockSize - 2*kHeaderSize; Write(BigString("foo", n)); @@ -346,7 +345,7 @@ TEST_F(LogTest, MarginalTrailer) { ASSERT_EQ("EOF", Read()); } -TEST_F(LogTest, MarginalTrailer2) { +TEST_P(LogTest, MarginalTrailer2) { // Make a trailer that is exactly the same length as an empty record. const int n = kBlockSize - 2*kHeaderSize; Write(BigString("foo", n)); @@ -359,7 +358,7 @@ TEST_F(LogTest, MarginalTrailer2) { ASSERT_EQ("", ReportMessage()); } -TEST_F(LogTest, ShortTrailer) { +TEST_P(LogTest, ShortTrailer) { const int n = kBlockSize - 2*kHeaderSize + 4; Write(BigString("foo", n)); ASSERT_EQ((unsigned int)(kBlockSize - kHeaderSize + 4), WrittenBytes()); @@ -371,7 +370,7 @@ TEST_F(LogTest, ShortTrailer) { ASSERT_EQ("EOF", Read()); } -TEST_F(LogTest, AlignedEof) { +TEST_P(LogTest, AlignedEof) { const int n = kBlockSize - 2*kHeaderSize + 4; Write(BigString("foo", n)); ASSERT_EQ((unsigned int)(kBlockSize - kHeaderSize + 4), WrittenBytes()); @@ -379,7 +378,7 @@ TEST_F(LogTest, AlignedEof) { ASSERT_EQ("EOF", Read()); } -TEST_F(LogTest, RandomRead) { +TEST_P(LogTest, RandomRead) { const int N = 500; Random write_rnd(301); for (int i = 0; i < N; i++) { @@ -394,7 +393,7 @@ TEST_F(LogTest, RandomRead) { // Tests of all the error paths in log_reader.cc follow: -TEST_F(LogTest, ReadError) { +TEST_P(LogTest, ReadError) { Write("foo"); ForceError(); ASSERT_EQ("EOF", Read()); @@ -402,7 +401,7 @@ TEST_F(LogTest, ReadError) { ASSERT_EQ("OK", MatchError("read error")); } -TEST_F(LogTest, BadRecordType) { +TEST_P(LogTest, BadRecordType) { Write("foo"); // Type is stored in header[6] IncrementByte(6, 100); @@ -412,7 +411,7 @@ TEST_F(LogTest, BadRecordType) { ASSERT_EQ("OK", MatchError("unknown record type")); } -TEST_F(LogTest, TruncatedTrailingRecordIsIgnored) { +TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) { Write("foo"); ShrinkSize(4); // Drop all payload as well as a header byte ASSERT_EQ("EOF", Read()); @@ -421,7 +420,7 @@ TEST_F(LogTest, TruncatedTrailingRecordIsIgnored) { ASSERT_EQ("", ReportMessage()); } -TEST_F(LogTest, TruncatedTrailingRecordIsNotIgnored) { +TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) { Write("foo"); ShrinkSize(4); // Drop all payload as well as a header byte ASSERT_EQ("EOF", Read(/*report_eof_inconsistency*/ true)); @@ -430,7 +429,7 @@ TEST_F(LogTest, TruncatedTrailingRecordIsNotIgnored) { ASSERT_EQ("OK", MatchError("Corruption: truncated header")); } -TEST_F(LogTest, BadLength) { +TEST_P(LogTest, BadLength) { const int kPayloadSize = kBlockSize - kHeaderSize; Write(BigString("bar", kPayloadSize)); Write("foo"); @@ -441,7 +440,7 @@ TEST_F(LogTest, BadLength) { ASSERT_EQ("OK", MatchError("bad record length")); } -TEST_F(LogTest, BadLengthAtEndIsIgnored) { +TEST_P(LogTest, BadLengthAtEndIsIgnored) { Write("foo"); ShrinkSize(1); ASSERT_EQ("EOF", Read()); @@ -449,7 +448,7 @@ TEST_F(LogTest, BadLengthAtEndIsIgnored) { ASSERT_EQ("", ReportMessage()); } -TEST_F(LogTest, BadLengthAtEndIsNotIgnored) { +TEST_P(LogTest, BadLengthAtEndIsNotIgnored) { Write("foo"); ShrinkSize(1); ASSERT_EQ("EOF", Read(/*report_eof_inconsistency=*/true)); @@ -457,7 +456,7 @@ TEST_F(LogTest, BadLengthAtEndIsNotIgnored) { ASSERT_EQ("OK", MatchError("Corruption: truncated header")); } -TEST_F(LogTest, ChecksumMismatch) { +TEST_P(LogTest, ChecksumMismatch) { Write("foo"); IncrementByte(0, 10); ASSERT_EQ("EOF", Read()); @@ -465,7 +464,7 @@ TEST_F(LogTest, ChecksumMismatch) { ASSERT_EQ("OK", MatchError("checksum mismatch")); } -TEST_F(LogTest, UnexpectedMiddleType) { +TEST_P(LogTest, UnexpectedMiddleType) { Write("foo"); SetByte(6, kMiddleType); FixChecksum(0, 3); @@ -474,7 +473,7 @@ TEST_F(LogTest, UnexpectedMiddleType) { ASSERT_EQ("OK", MatchError("missing start")); } -TEST_F(LogTest, UnexpectedLastType) { +TEST_P(LogTest, UnexpectedLastType) { Write("foo"); SetByte(6, kLastType); FixChecksum(0, 3); @@ -483,7 +482,7 @@ TEST_F(LogTest, UnexpectedLastType) { ASSERT_EQ("OK", MatchError("missing start")); } -TEST_F(LogTest, UnexpectedFullType) { +TEST_P(LogTest, UnexpectedFullType) { Write("foo"); Write("bar"); SetByte(6, kFirstType); @@ -494,7 +493,7 @@ TEST_F(LogTest, UnexpectedFullType) { ASSERT_EQ("OK", MatchError("partial record without end")); } -TEST_F(LogTest, UnexpectedFirstType) { +TEST_P(LogTest, UnexpectedFirstType) { Write("foo"); Write(BigString("bar", 100000)); SetByte(6, kFirstType); @@ -505,7 +504,7 @@ TEST_F(LogTest, UnexpectedFirstType) { ASSERT_EQ("OK", MatchError("partial record without end")); } -TEST_F(LogTest, MissingLastIsIgnored) { +TEST_P(LogTest, MissingLastIsIgnored) { Write(BigString("bar", kBlockSize)); // Remove the LAST block, including header. ShrinkSize(14); @@ -514,7 +513,7 @@ TEST_F(LogTest, MissingLastIsIgnored) { ASSERT_EQ(0U, DroppedBytes()); } -TEST_F(LogTest, MissingLastIsNotIgnored) { +TEST_P(LogTest, MissingLastIsNotIgnored) { Write(BigString("bar", kBlockSize)); // Remove the LAST block, including header. ShrinkSize(14); @@ -523,7 +522,7 @@ TEST_F(LogTest, MissingLastIsNotIgnored) { ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data")); } -TEST_F(LogTest, PartialLastIsIgnored) { +TEST_P(LogTest, PartialLastIsIgnored) { Write(BigString("bar", kBlockSize)); // Cause a bad record length in the LAST block. ShrinkSize(1); @@ -532,7 +531,7 @@ TEST_F(LogTest, PartialLastIsIgnored) { ASSERT_EQ(0U, DroppedBytes()); } -TEST_F(LogTest, PartialLastIsNotIgnored) { +TEST_P(LogTest, PartialLastIsNotIgnored) { Write(BigString("bar", kBlockSize)); // Cause a bad record length in the LAST block. ShrinkSize(1); @@ -543,7 +542,7 @@ TEST_F(LogTest, PartialLastIsNotIgnored) { "error reading trailing data")); } -TEST_F(LogTest, ErrorJoinsRecords) { +TEST_P(LogTest, ErrorJoinsRecords) { // Consider two fragmented records: // first(R1) last(R1) first(R2) last(R2) // where the middle two fragments disappear. We do not want @@ -566,43 +565,43 @@ TEST_F(LogTest, ErrorJoinsRecords) { ASSERT_GE(dropped, 2 * kBlockSize); } -TEST_F(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); } +TEST_P(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); } -TEST_F(LogTest, ReadSecondOneOff) { CheckInitialOffsetRecord(1, 1); } +TEST_P(LogTest, ReadSecondOneOff) { CheckInitialOffsetRecord(1, 1); } -TEST_F(LogTest, ReadSecondTenThousand) { CheckInitialOffsetRecord(10000, 1); } +TEST_P(LogTest, ReadSecondTenThousand) { CheckInitialOffsetRecord(10000, 1); } -TEST_F(LogTest, ReadSecondStart) { CheckInitialOffsetRecord(10007, 1); } +TEST_P(LogTest, ReadSecondStart) { CheckInitialOffsetRecord(10007, 1); } -TEST_F(LogTest, ReadThirdOneOff) { CheckInitialOffsetRecord(10008, 2); } +TEST_P(LogTest, ReadThirdOneOff) { CheckInitialOffsetRecord(10008, 2); } -TEST_F(LogTest, ReadThirdStart) { CheckInitialOffsetRecord(20014, 2); } +TEST_P(LogTest, ReadThirdStart) { CheckInitialOffsetRecord(20014, 2); } -TEST_F(LogTest, ReadFourthOneOff) { CheckInitialOffsetRecord(20015, 3); } +TEST_P(LogTest, ReadFourthOneOff) { CheckInitialOffsetRecord(20015, 3); } -TEST_F(LogTest, ReadFourthFirstBlockTrailer) { +TEST_P(LogTest, ReadFourthFirstBlockTrailer) { CheckInitialOffsetRecord(log::kBlockSize - 4, 3); } -TEST_F(LogTest, ReadFourthMiddleBlock) { +TEST_P(LogTest, ReadFourthMiddleBlock) { CheckInitialOffsetRecord(log::kBlockSize + 1, 3); } -TEST_F(LogTest, ReadFourthLastBlock) { +TEST_P(LogTest, ReadFourthLastBlock) { CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3); } -TEST_F(LogTest, ReadFourthStart) { +TEST_P(LogTest, ReadFourthStart) { CheckInitialOffsetRecord( 2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize, 3); } -TEST_F(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); } +TEST_P(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); } -TEST_F(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); } +TEST_P(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); } -TEST_F(LogTest, ClearEofSingleBlock) { +TEST_P(LogTest, ClearEofSingleBlock) { Write("foo"); Write("bar"); ForceEOF(3 + kHeaderSize + 2); @@ -617,7 +616,7 @@ TEST_F(LogTest, ClearEofSingleBlock) { ASSERT_TRUE(IsEOF()); } -TEST_F(LogTest, ClearEofMultiBlock) { +TEST_P(LogTest, ClearEofMultiBlock) { size_t num_full_blocks = 5; size_t n = (kBlockSize - kHeaderSize) * num_full_blocks + 25; Write(BigString("foo", n)); @@ -634,7 +633,7 @@ TEST_F(LogTest, ClearEofMultiBlock) { ASSERT_TRUE(IsEOF()); } -TEST_F(LogTest, ClearEofError) { +TEST_P(LogTest, ClearEofError) { // If an error occurs during Read() in UnmarkEOF(), the records contained // in the buffer should be returned on subsequent calls of ReadRecord() // until no more full records are left, whereafter ReadRecord() should return @@ -652,7 +651,7 @@ TEST_F(LogTest, ClearEofError) { ASSERT_EQ("EOF", Read()); } -TEST_F(LogTest, ClearEofError2) { +TEST_P(LogTest, ClearEofError2) { Write("foo"); Write("bar"); UnmarkEOF(); @@ -666,6 +665,8 @@ TEST_F(LogTest, ClearEofError2) { ASSERT_EQ("OK", MatchError("read error")); } +INSTANTIATE_TEST_CASE_P(bool, LogTest, ::testing::Values(0, 2)); + } // namespace log } // namespace rocksdb diff --git a/db/log_writer.cc b/db/log_writer.cc index 32d4afdc9..9bef090f1 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -18,8 +18,12 @@ namespace rocksdb { namespace log { -Writer::Writer(unique_ptr&& dest) - : dest_(std::move(dest)), block_offset_(0) { +Writer::Writer(unique_ptr&& dest, + uint64_t log_number, bool recycle_log_files) + : dest_(std::move(dest)), + block_offset_(0), + log_number_(log_number), + recycle_log_files_(recycle_log_files) { for (int i = 0; i <= kMaxRecordType; i++) { char t = static_cast(i); type_crc_[i] = crc32c::Value(&t, 1); diff --git a/db/log_writer.h b/db/log_writer.h index 6b59bbdd5..a8aa5d476 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -63,7 +63,8 @@ class Writer { // Create a writer that will append data to "*dest". // "*dest" must be initially empty. // "*dest" must remain live while this Writer is in use. - explicit Writer(unique_ptr&& dest); + explicit Writer(unique_ptr&& dest, + uint64_t log_number, bool recycle_log_files); ~Writer(); Status AddRecord(const Slice& slice); @@ -74,6 +75,8 @@ class Writer { private: unique_ptr dest_; int block_offset_; // Current offset in block + uint64_t log_number_; + bool recycle_log_files_; // crc32c values for all supported record types. These are // pre-computed to reduce the overhead of computing the crc of the diff --git a/db/repair.cc b/db/repair.cc index ba63850be..d5375f5dd 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -413,7 +413,7 @@ class Repairer { { unique_ptr file_writer( new WritableFileWriter(std::move(file), env_options)); - log::Writer log(std::move(file_writer)); + log::Writer log(std::move(file_writer), 0, false); std::string record; edit_->EncodeTo(&record); status = log.AddRecord(record); diff --git a/db/version_set.cc b/db/version_set.cc index 5ddecbb53..146e93ad0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2112,7 +2112,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, unique_ptr file_writer( new WritableFileWriter(std::move(descriptor_file), opt_env_opts)); - descriptor_log_.reset(new log::Writer(std::move(file_writer))); + descriptor_log_.reset(new log::Writer(std::move(file_writer), 0, false)); s = WriteSnapshot(descriptor_log_.get()); } } diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc index ec56c9632..428f4c954 100644 --- a/db/wal_manager_test.cc +++ b/db/wal_manager_test.cc @@ -77,7 +77,7 @@ class WalManagerTest : public testing::Test { ASSERT_OK(env_->NewWritableFile(fname, &file, env_options_)); unique_ptr file_writer( new WritableFileWriter(std::move(file), env_options_)); - current_log_writer_.reset(new log::Writer(std::move(file_writer))); + current_log_writer_.reset(new log::Writer(std::move(file_writer), 0, false)); } void CreateArchiveLogs(int num_logs, int entries_per_log) { @@ -127,7 +127,8 @@ TEST_F(WalManagerTest, ReadFirstRecordCache) { unique_ptr file_writer( new WritableFileWriter(std::move(file), EnvOptions())); - log::Writer writer(std::move(file_writer)); + log::Writer writer(std::move(file_writer), 1, + db_options_.recycle_log_file_num > 0); WriteBatch batch; batch.Put("foo", "bar"); WriteBatchInternal::SetSequence(&batch, 10);