Merge pull request #746 from ceph/wip-recycle

Add Options.recycle_log_file_num for Recycling WAL Files
main
Siying Dong 9 years ago
commit 138876a62c
  1. 5
      db/c.cc
  2. 2
      db/compaction_job_test.cc
  3. 76
      db/db_impl.cc
  4. 2
      db/db_impl.h
  5. 4
      db/db_test.cc
  6. 4
      db/db_test_util.cc
  7. 7
      db/db_test_util.h
  8. 2
      db/flush_job_test.cc
  9. 14
      db/log_format.h
  10. 161
      db/log_reader.cc
  11. 27
      db/log_reader.h
  12. 216
      db/log_test.cc
  13. 68
      db/log_writer.cc
  14. 16
      db/log_writer.h
  15. 6
      db/repair.cc
  16. 6
      db/transaction_log_impl.cc
  17. 17
      db/version_set.cc
  18. 4
      db/wal_manager.cc
  19. 5
      db/wal_manager_test.cc
  20. 2
      include/rocksdb/c.h
  21. 6
      include/rocksdb/env.h
  22. 10
      include/rocksdb/options.h
  23. 53
      java/rocksjni/options.cc
  24. 16
      tools/ldb_cmd.cc
  25. 11
      util/env.cc
  26. 44
      util/env_posix.cc
  27. 4
      util/options.cc
  28. 3
      util/options_helper.h
  29. 6
      util/options_test.cc

@ -1687,6 +1687,11 @@ void rocksdb_options_set_keep_log_file_num(rocksdb_options_t* opt, size_t v) {
opt->rep.keep_log_file_num = v; opt->rep.keep_log_file_num = v;
} }
void rocksdb_options_set_recycle_log_file_num(rocksdb_options_t* opt,
size_t v) {
opt->rep.recycle_log_file_num = v;
}
void rocksdb_options_set_soft_rate_limit(rocksdb_options_t* opt, double v) { void rocksdb_options_set_soft_rate_limit(rocksdb_options_t* opt, double v) {
opt->rep.soft_rate_limit = v; opt->rep.soft_rate_limit = v;
} }

@ -198,7 +198,7 @@ class CompactionJobTest : public testing::Test {
unique_ptr<WritableFileWriter> file_writer( unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options_)); new WritableFileWriter(std::move(file), env_options_));
{ {
log::Writer log(std::move(file_writer)); log::Writer log(std::move(file_writer), 0, false);
std::string record; std::string record;
new_db.EncodeTo(&record); new_db.EncodeTo(&record);
s = log.AddRecord(record); s = log.AddRecord(record);

@ -150,6 +150,10 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
} }
} }
if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) {
result.recycle_log_file_num = false;
}
if (result.wal_dir.empty()) { if (result.wal_dir.empty()) {
// Use dbname as default // Use dbname as default
result.wal_dir = dbname; result.wal_dir = dbname;
@ -416,7 +420,7 @@ Status DBImpl::NewDB() {
file->SetPreallocationBlockSize(db_options_.manifest_preallocation_size); file->SetPreallocationBlockSize(db_options_.manifest_preallocation_size);
unique_ptr<WritableFileWriter> file_writer( unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options)); new WritableFileWriter(std::move(file), env_options));
log::Writer log(std::move(file_writer)); log::Writer log(std::move(file_writer), 0, false);
std::string record; std::string record;
new_db.EncodeTo(&record); new_db.EncodeTo(&record);
s = log.AddRecord(record); s = log.AddRecord(record);
@ -601,7 +605,13 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// find newly obsoleted log files // find newly obsoleted log files
while (alive_log_files_.begin()->number < min_log_number) { while (alive_log_files_.begin()->number < min_log_number) {
auto& earliest = *alive_log_files_.begin(); auto& earliest = *alive_log_files_.begin();
job_context->log_delete_files.push_back(earliest.number); if (db_options_.recycle_log_file_num > log_recycle_files.size()) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"adding log %" PRIu64 " to recycle list\n", earliest.number);
log_recycle_files.push_back(earliest.number);
} else {
job_context->log_delete_files.push_back(earliest.number);
}
total_log_size_ -= earliest.size; total_log_size_ -= earliest.size;
alive_log_files_.pop_front(); alive_log_files_.pop_front();
// Current log should always stay alive since it can't have // Current log should always stay alive since it can't have
@ -1109,28 +1119,13 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// paranoid_checks==false so that corruptions cause entire commits // paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly // to be skipped instead of propagating bad information (like overly
// large sequence numbers). // large sequence numbers).
log::Reader reader(std::move(file_reader), &reporter, true /*checksum*/, log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
0 /*initial_offset*/); true /*checksum*/, 0 /*initial_offset*/, log_number);
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d skip-recovery %d", log_number, "Recovering log #%" PRIu64 " mode %d skip-recovery %d", log_number,
db_options_.wal_recovery_mode, !continue_replay_log); db_options_.wal_recovery_mode, !continue_replay_log);
// Determine if we should tolerate incomplete records at the tail end of the // Determine if we should tolerate incomplete records at the tail end of the
// log
bool report_eof_inconsistency;
if (db_options_.wal_recovery_mode ==
WALRecoveryMode::kAbsoluteConsistency) {
// in clean shutdown we don't expect any error in the log files
report_eof_inconsistency = true;
} else {
// for other modes ignore only incomplete records in the last log file
// which is presumably due to write in progress during restart
report_eof_inconsistency = false;
// TODO krad: Evaluate if we need to move to a more strict mode where we
// restrict the inconsistency to only the last log
}
// Read all the records and add to a memtable // Read all the records and add to a memtable
std::string scratch; std::string scratch;
Slice record; Slice record;
@ -1145,9 +1140,10 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
} }
} }
while (continue_replay_log && while (
reader.ReadRecord(&record, &scratch, report_eof_inconsistency) && continue_replay_log &&
status.ok()) { reader.ReadRecord(&record, &scratch, db_options_.wal_recovery_mode) &&
status.ok()) {
if (record.size() < 12) { if (record.size() < 12) {
reporter.Corruption(record.size(), reporter.Corruption(record.size(),
Status::Corruption("log record too small")); Status::Corruption("log record too small"));
@ -4072,6 +4068,12 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
// Do this without holding the dbmutex lock. // Do this without holding the dbmutex lock.
assert(versions_->prev_log_number() == 0); assert(versions_->prev_log_number() == 0);
bool creating_new_log = !log_empty_; bool creating_new_log = !log_empty_;
uint64_t recycle_log_number = 0;
if (creating_new_log && db_options_.recycle_log_file_num &&
!log_recycle_files.empty()) {
recycle_log_number = log_recycle_files.front();
log_recycle_files.pop_front();
}
uint64_t new_log_number = uint64_t new_log_number =
creating_new_log ? versions_->NewFileNumber() : logfile_number_; creating_new_log ? versions_->NewFileNumber() : logfile_number_;
SuperVersion* new_superversion = nullptr; SuperVersion* new_superversion = nullptr;
@ -4082,17 +4084,28 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
if (creating_new_log) { if (creating_new_log) {
EnvOptions opt_env_opt = EnvOptions opt_env_opt =
env_->OptimizeForLogWrite(env_options_, db_options_); env_->OptimizeForLogWrite(env_options_, db_options_);
s = NewWritableFile(env_, if (recycle_log_number) {
LogFileName(db_options_.wal_dir, new_log_number), Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
&lfile, opt_env_opt); "reusing log %" PRIu64 " from recycle list\n", recycle_log_number);
s = env_->ReuseWritableFile(
LogFileName(db_options_.wal_dir, new_log_number),
LogFileName(db_options_.wal_dir, recycle_log_number), &lfile,
opt_env_opt);
} else {
s = NewWritableFile(env_,
LogFileName(db_options_.wal_dir, new_log_number),
&lfile, opt_env_opt);
}
if (s.ok()) { if (s.ok()) {
// Our final size should be less than write_buffer_size // Our final size should be less than write_buffer_size
// (compression, etc) but err on the side of caution. // (compression, etc) but err on the side of caution.
lfile->SetPreallocationBlockSize( lfile->SetPreallocationBlockSize(1.1 *
1.1 * mutable_cf_options.write_buffer_size); mutable_cf_options.write_buffer_size);
unique_ptr<WritableFileWriter> file_writer( unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(lfile), opt_env_opt)); new WritableFileWriter(std::move(lfile), opt_env_opt));
new_log = new log::Writer(std::move(file_writer)); new_log = new log::Writer(std::move(file_writer),
new_log_number,
db_options_.recycle_log_file_num > 0);
} }
} }
@ -4731,8 +4744,11 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
impl->logfile_number_ = new_log_number; impl->logfile_number_ = new_log_number;
unique_ptr<WritableFileWriter> file_writer( unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(lfile), opt_env_options)); new WritableFileWriter(std::move(lfile), opt_env_options));
impl->logs_.emplace_back(new_log_number, impl->logs_.emplace_back(
new log::Writer(std::move(file_writer))); new_log_number,
new log::Writer(std::move(file_writer),
new_log_number,
impl->db_options_.recycle_log_file_num > 0));
// set column family handles // set column family handles
for (auto cf : column_families) { for (auto cf : column_families) {

@ -556,6 +556,8 @@ class DBImpl : public DB {
// * whenever there is an error in background flush or compaction // * whenever there is an error in background flush or compaction
InstrumentedCondVar bg_cv_; InstrumentedCondVar bg_cv_;
uint64_t logfile_number_; uint64_t logfile_number_;
std::deque<uint64_t>
log_recycle_files; // a list of log files that we can recycle
bool log_dir_synced_; bool log_dir_synced_;
bool log_empty_; bool log_empty_;
ColumnFamilyHandleImpl* default_cf_handle_; ColumnFamilyHandleImpl* default_cf_handle_;

@ -5072,7 +5072,9 @@ class RecoveryTestHelper {
ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options)); ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options));
unique_ptr<WritableFileWriter> file_writer( unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options)); new WritableFileWriter(std::move(file), env_options));
current_log_writer.reset(new log::Writer(std::move(file_writer))); current_log_writer.reset(new log::Writer(
std::move(file_writer), current_log_number,
db_options.recycle_log_file_num > 0));
for (int i = 0; i < kKeysPerWALFile; i++) { for (int i = 0; i < kKeysPerWALFile; i++) {
std::string key = "key" + ToString(count++); std::string key = "key" + ToString(count++);

@ -340,6 +340,10 @@ Options DBTestBase::CurrentOptions(
options.row_cache = NewLRUCache(1024 * 1024); options.row_cache = NewLRUCache(1024 * 1024);
break; break;
} }
case kRecycleLogFiles: {
options.recycle_log_file_num = 2;
break;
}
case kLevelSubcompactions: { case kLevelSubcompactions: {
options.max_subcompactions = 4; options.max_subcompactions = 4;
break; break;

@ -437,9 +437,10 @@ class DBTestBase : public testing::Test {
kFIFOCompaction = 26, kFIFOCompaction = 26,
kOptimizeFiltersForHits = 27, kOptimizeFiltersForHits = 27,
kRowCache = 28, kRowCache = 28,
kLevelSubcompactions = 29, kRecycleLogFiles = 29,
kUniversalSubcompactions = 30, kLevelSubcompactions = 30,
kEnd = 29 kUniversalSubcompactions = 31,
kEnd = 30
}; };
int option_config_; int option_config_;

@ -61,7 +61,7 @@ class FlushJobTest : public testing::Test {
unique_ptr<WritableFileWriter> file_writer( unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), EnvOptions())); new WritableFileWriter(std::move(file), EnvOptions()));
{ {
log::Writer log(std::move(file_writer)); log::Writer log(std::move(file_writer), 0, false);
std::string record; std::string record;
new_db.EncodeTo(&record); new_db.EncodeTo(&record);
s = log.AddRecord(record); s = log.AddRecord(record);

@ -22,14 +22,24 @@ enum RecordType {
// For fragments // For fragments
kFirstType = 2, kFirstType = 2,
kMiddleType = 3, kMiddleType = 3,
kLastType = 4 kLastType = 4,
// For recycled log files
kRecyclableFullType = 5,
kRecyclableFirstType = 6,
kRecyclableMiddleType = 7,
kRecyclableLastType = 8,
}; };
static const int kMaxRecordType = kLastType; static const int kMaxRecordType = kRecyclableLastType;
static const unsigned int kBlockSize = 32768; static const unsigned int kBlockSize = 32768;
// Header is checksum (4 bytes), type (1 byte), length (2 bytes). // Header is checksum (4 bytes), type (1 byte), length (2 bytes).
static const int kHeaderSize = 4 + 1 + 2; static const int kHeaderSize = 4 + 1 + 2;
// Recyclable header is checksum (4 bytes), type (1 byte), log number
// (4 bytes), length (2 bytes).
static const int kRecyclableHeaderSize = 4 + 1 + 4 + 2;
} // namespace log } // namespace log
} // namespace rocksdb } // namespace rocksdb

@ -21,9 +21,12 @@ namespace log {
Reader::Reporter::~Reporter() { Reader::Reporter::~Reporter() {
} }
Reader::Reader(unique_ptr<SequentialFileReader>&& _file, Reporter* reporter, Reader::Reader(std::shared_ptr<Logger> info_log,
bool checksum, uint64_t initial_offset) unique_ptr<SequentialFileReader>&& _file,
: file_(std::move(_file)), Reporter* reporter, bool checksum, uint64_t initial_offset,
uint64_t log_num)
: info_log_(info_log),
file_(std::move(_file)),
reporter_(reporter), reporter_(reporter),
checksum_(checksum), checksum_(checksum),
backing_store_(new char[kBlockSize]), backing_store_(new char[kBlockSize]),
@ -33,7 +36,8 @@ Reader::Reader(unique_ptr<SequentialFileReader>&& _file, Reporter* reporter,
eof_offset_(0), eof_offset_(0),
last_record_offset_(0), last_record_offset_(0),
end_of_buffer_offset_(0), end_of_buffer_offset_(0),
initial_offset_(initial_offset) {} initial_offset_(initial_offset),
log_number_(log_num) {}
Reader::~Reader() { Reader::~Reader() {
delete[] backing_store_; delete[] backing_store_;
@ -62,8 +66,15 @@ bool Reader::SkipToInitialBlock() {
return true; return true;
} }
// For kAbsoluteConsistency, on clean shutdown we don't expect any error
// in the log files. For other modes, we can ignore only incomplete records
// in the last log file, which are presumably due to a write in progress
// during restart (or from log recycling).
//
// TODO krad: Evaluate if we need to move to a more strict mode where we
// restrict the inconsistency to only the last log
bool Reader::ReadRecord(Slice* record, std::string* scratch, bool Reader::ReadRecord(Slice* record, std::string* scratch,
const bool report_eof_inconsistency) { WALRecoveryMode wal_recovery_mode) {
if (last_record_offset_ < initial_offset_) { if (last_record_offset_ < initial_offset_) {
if (!SkipToInitialBlock()) { if (!SkipToInitialBlock()) {
return false; return false;
@ -80,10 +91,11 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
Slice fragment; Slice fragment;
while (true) { while (true) {
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
const unsigned int record_type = size_t drop_size;
ReadPhysicalRecord(&fragment, report_eof_inconsistency); const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size);
switch (record_type) { switch (record_type) {
case kFullType: case kFullType:
case kRecyclableFullType:
if (in_fragmented_record && !scratch->empty()) { if (in_fragmented_record && !scratch->empty()) {
// Handle bug in earlier versions of log::Writer where // Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end // it could emit an empty kFirstType record at the tail end
@ -98,6 +110,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
return true; return true;
case kFirstType: case kFirstType:
case kRecyclableFirstType:
if (in_fragmented_record && !scratch->empty()) { if (in_fragmented_record && !scratch->empty()) {
// Handle bug in earlier versions of log::Writer where // Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end // it could emit an empty kFirstType record at the tail end
@ -111,6 +124,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
break; break;
case kMiddleType: case kMiddleType:
case kRecyclableMiddleType:
if (!in_fragmented_record) { if (!in_fragmented_record) {
ReportCorruption(fragment.size(), ReportCorruption(fragment.size(),
"missing start of fragmented record(1)"); "missing start of fragmented record(1)");
@ -120,6 +134,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
break; break;
case kLastType: case kLastType:
case kRecyclableLastType:
if (!in_fragmented_record) { if (!in_fragmented_record) {
ReportCorruption(fragment.size(), ReportCorruption(fragment.size(),
"missing start of fragmented record(2)"); "missing start of fragmented record(2)");
@ -131,9 +146,17 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
} }
break; break;
case kBadHeader:
if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
// in clean shutdown we don't expect any error in the log files
ReportCorruption(drop_size, "truncated header");
}
// fall-thru
case kEof: case kEof:
if (in_fragmented_record) { if (in_fragmented_record) {
if (report_eof_inconsistency) { if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
// in clean shutdown we don't expect any error in the log files
ReportCorruption(scratch->size(), "error reading trailing data"); ReportCorruption(scratch->size(), "error reading trailing data");
} }
// This can be caused by the writer dying immediately after // This can be caused by the writer dying immediately after
@ -143,6 +166,23 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
} }
return false; return false;
case kOldRecord:
if (wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) {
// Treat a record from a previous instance of the log as EOF.
if (in_fragmented_record) {
if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
// in clean shutdown we don't expect any error in the log files
ReportCorruption(scratch->size(), "error reading trailing data");
}
// This can be caused by the writer dying immediately after
// writing a physical record but before completing the next; don't
// treat it as a corruption, just ignore the entire logical record.
scratch->clear();
}
return false;
}
// fall-thru
case kBadRecord: case kBadRecord:
if (in_fragmented_record) { if (in_fragmented_record) {
ReportCorruption(scratch->size(), "error in middle of record"); ReportCorruption(scratch->size(), "error in middle of record");
@ -244,36 +284,49 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) {
} }
} }
unsigned int Reader::ReadPhysicalRecord(Slice* result, bool Reader::ReadMore(size_t* drop_size, int *error) {
const bool report_eof_inconsistency) { if (!eof_ && !read_error_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
read_error_ = true;
*error = kEof;
return false;
} else if (buffer_.size() < (size_t)kBlockSize) {
eof_ = true;
eof_offset_ = buffer_.size();
}
return true;
} else {
// Note that if buffer_ is non-empty, we have a truncated header at the
// end of the file, which can be caused by the writer crashing in the
// middle of writing the header. Unless explicitly requested we don't
// considering this an error, just report EOF.
if (buffer_.size()) {
*drop_size = buffer_.size();
buffer_.clear();
*error = kBadHeader;
return false;
}
buffer_.clear();
*error = kEof;
return false;
}
}
unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
while (true) { while (true) {
// We need at least the minimum header size
if (buffer_.size() < (size_t)kHeaderSize) { if (buffer_.size() < (size_t)kHeaderSize) {
if (!eof_ && !read_error_) { int r;
// Last read was a full read, so this is a trailer to skip if (!ReadMore(drop_size, &r)) {
buffer_.clear(); return r;
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
read_error_ = true;
return kEof;
} else if (buffer_.size() < (size_t)kBlockSize) {
eof_ = true;
eof_offset_ = buffer_.size();
}
continue;
} else {
// Note that if buffer_ is non-empty, we have a truncated header at the
// end of the file, which can be caused by the writer crashing in the
// middle of writing the header. Unless explicitly requested we don't
// considering this an error, just report EOF.
if (buffer_.size() && report_eof_inconsistency) {
ReportCorruption(buffer_.size(), "truncated header");
}
buffer_.clear();
return kEof;
} }
continue;
} }
// Parse the header // Parse the header
@ -282,18 +335,34 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result,
const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff; const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
const unsigned int type = header[6]; const unsigned int type = header[6];
const uint32_t length = a | (b << 8); const uint32_t length = a | (b << 8);
if (kHeaderSize + length > buffer_.size()) { int header_size = kHeaderSize;
size_t drop_size = buffer_.size(); if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
header_size = kRecyclableHeaderSize;
// We need enough for the larger header
if (buffer_.size() < (size_t)kRecyclableHeaderSize) {
int r;
if (!ReadMore(drop_size, &r)) {
return r;
}
continue;
}
const uint32_t log_num = DecodeFixed32(header + 7);
if (log_num != log_number_) {
return kOldRecord;
}
}
if (header_size + length > buffer_.size()) {
*drop_size = buffer_.size();
buffer_.clear(); buffer_.clear();
if (!eof_) { if (!eof_) {
ReportCorruption(drop_size, "bad record length"); ReportCorruption(*drop_size, "bad record length");
return kBadRecord; return kBadRecord;
} }
// If the end of the file has been reached without reading |length| bytes // If the end of the file has been reached without reading |length| bytes
// of payload, assume the writer died in the middle of writing the record. // of payload, assume the writer died in the middle of writing the record.
// Don't report a corruption unless requested. // Don't report a corruption unless requested.
if (drop_size && report_eof_inconsistency) { if (*drop_size) {
ReportCorruption(drop_size, "truncated header"); return kBadHeader;
} }
return kEof; return kEof;
} }
@ -311,29 +380,29 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result,
// Check crc // Check crc
if (checksum_) { if (checksum_) {
uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
uint32_t actual_crc = crc32c::Value(header + 6, 1 + length); uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
if (actual_crc != expected_crc) { if (actual_crc != expected_crc) {
// Drop the rest of the buffer since "length" itself may have // Drop the rest of the buffer since "length" itself may have
// been corrupted and if we trust it, we could find some // been corrupted and if we trust it, we could find some
// fragment of a real log record that just happens to look // fragment of a real log record that just happens to look
// like a valid log record. // like a valid log record.
size_t drop_size = buffer_.size(); *drop_size = buffer_.size();
buffer_.clear(); buffer_.clear();
ReportCorruption(drop_size, "checksum mismatch"); ReportCorruption(*drop_size, "checksum mismatch");
return kBadRecord; return kBadRecord;
} }
} }
buffer_.remove_prefix(kHeaderSize + length); buffer_.remove_prefix(header_size + length);
// Skip physical record that started before initial_offset_ // Skip physical record that started before initial_offset_
if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length < if (end_of_buffer_offset_ - buffer_.size() - header_size - length <
initial_offset_) { initial_offset_) {
result->clear(); result->clear();
return kBadRecord; return kBadRecord;
} }
*result = Slice(header + kHeaderSize, length); *result = Slice(header + header_size, length);
return type; return type;
} }
} }

@ -14,10 +14,12 @@
#include "db/log_format.h" #include "db/log_format.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/options.h"
namespace rocksdb { namespace rocksdb {
class SequentialFileReader; class SequentialFileReader;
class Logger;
using std::unique_ptr; using std::unique_ptr;
namespace log { namespace log {
@ -51,8 +53,10 @@ class Reader {
// //
// The Reader will start reading at the first record located at physical // The Reader will start reading at the first record located at physical
// position >= initial_offset within the file. // position >= initial_offset within the file.
Reader(unique_ptr<SequentialFileReader>&& file, Reporter* reporter, Reader(std::shared_ptr<Logger> info_log,
bool checksum, uint64_t initial_offset); unique_ptr<SequentialFileReader>&& file,
Reporter* reporter, bool checksum, uint64_t initial_offset,
uint64_t log_num);
~Reader(); ~Reader();
@ -62,7 +66,8 @@ class Reader {
// will only be valid until the next mutating operation on this // will only be valid until the next mutating operation on this
// reader or the next mutation to *scratch. // reader or the next mutation to *scratch.
bool ReadRecord(Slice* record, std::string* scratch, bool ReadRecord(Slice* record, std::string* scratch,
bool report_eof_inconsistency = false); WALRecoveryMode wal_recovery_mode =
WALRecoveryMode::kTolerateCorruptedTailRecords);
// Returns the physical offset of the last record returned by ReadRecord. // Returns the physical offset of the last record returned by ReadRecord.
// //
@ -84,6 +89,7 @@ class Reader {
SequentialFileReader* file() { return file_.get(); } SequentialFileReader* file() { return file_.get(); }
private: private:
std::shared_ptr<Logger> info_log_;
const unique_ptr<SequentialFileReader> file_; const unique_ptr<SequentialFileReader> file_;
Reporter* const reporter_; Reporter* const reporter_;
bool const checksum_; bool const checksum_;
@ -104,6 +110,9 @@ class Reader {
// Offset at which to start looking for the first record to return // Offset at which to start looking for the first record to return
uint64_t const initial_offset_; uint64_t const initial_offset_;
// which log number this is
uint64_t const log_number_;
// Extend record types with the following special values // Extend record types with the following special values
enum { enum {
kEof = kMaxRecordType + 1, kEof = kMaxRecordType + 1,
@ -112,7 +121,11 @@ class Reader {
// * The record has an invalid CRC (ReadPhysicalRecord reports a drop) // * The record has an invalid CRC (ReadPhysicalRecord reports a drop)
// * The record is a 0-length record (No drop is reported) // * The record is a 0-length record (No drop is reported)
// * The record is below constructor's initial_offset (No drop is reported) // * The record is below constructor's initial_offset (No drop is reported)
kBadRecord = kMaxRecordType + 2 kBadRecord = kMaxRecordType + 2,
// Returned when we fail to read a valid header.
kBadHeader = kMaxRecordType + 3,
// Returned when we read an old record from a previous user of the log.
kOldRecord = kMaxRecordType + 4,
}; };
// Skips all blocks that are completely before "initial_offset_". // Skips all blocks that are completely before "initial_offset_".
@ -121,8 +134,10 @@ class Reader {
bool SkipToInitialBlock(); bool SkipToInitialBlock();
// Return type, or one of the preceding special values // Return type, or one of the preceding special values
unsigned int ReadPhysicalRecord(Slice* result, unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size);
bool report_eof_inconsistency = false);
// Read some more
bool ReadMore(size_t* drop_size, int *error);
// Reports dropped bytes to the reporter. // Reports dropped bytes to the reporter.
// buffer_ must be updated to remove the dropped bytes prior to invocation. // buffer_ must be updated to remove the dropped bytes prior to invocation.

@ -43,7 +43,7 @@ static std::string RandomSkewedString(int i, Random* rnd) {
return BigString(NumberString(i), rnd->Skewed(17)); return BigString(NumberString(i), rnd->Skewed(17));
} }
class LogTest : public testing::Test { class LogTest : public ::testing::TestWithParam<int> {
private: private:
class StringSource : public SequentialFile { class StringSource : public SequentialFile {
public: public:
@ -153,19 +153,26 @@ class LogTest : public testing::Test {
// Record metadata for testing initial offset functionality // Record metadata for testing initial offset functionality
static size_t initial_offset_record_sizes_[]; static size_t initial_offset_record_sizes_[];
static uint64_t initial_offset_last_record_offsets_[]; uint64_t initial_offset_last_record_offsets_[4];
public: public:
LogTest() LogTest()
: reader_contents_(), : reader_contents_(),
dest_holder_( dest_holder_(test::GetWritableFileWriter(
test::GetWritableFileWriter( new test::StringSink(&reader_contents_))),
new test::StringSink(&reader_contents_))),
source_holder_( source_holder_(
test::GetSequentialFileReader(new StringSource(reader_contents_))), test::GetSequentialFileReader(new StringSource(reader_contents_))),
writer_(std::move(dest_holder_)), writer_(std::move(dest_holder_), 123, GetParam()),
reader_(std::move(source_holder_), &report_, true /*checksum*/, reader_(NULL, std::move(source_holder_), &report_, true /*checksum*/,
0 /*initial_offset*/) {} 0 /*initial_offset*/, 123) {
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
initial_offset_last_record_offsets_[0] = 0;
initial_offset_last_record_offsets_[1] = header_size + 10000;
initial_offset_last_record_offsets_[2] = 2 * (header_size + 10000);
initial_offset_last_record_offsets_[3] = 2 * (header_size + 10000) +
(2 * log::kBlockSize - 1000) +
3 * header_size;
}
void Write(const std::string& msg) { void Write(const std::string& msg) {
writer_.AddRecord(Slice(msg)); writer_.AddRecord(Slice(msg));
@ -175,10 +182,11 @@ class LogTest : public testing::Test {
return dest_contents().size(); return dest_contents().size();
} }
std::string Read(const bool report_eof_inconsistency = false) { std::string Read(const WALRecoveryMode wal_recovery_mode =
WALRecoveryMode::kTolerateCorruptedTailRecords) {
std::string scratch; std::string scratch;
Slice record; Slice record;
if (reader_.ReadRecord(&record, &scratch, report_eof_inconsistency)) { if (reader_.ReadRecord(&record, &scratch, wal_recovery_mode)) {
return record.ToString(); return record.ToString();
} else { } else {
return "EOF"; return "EOF";
@ -200,9 +208,11 @@ class LogTest : public testing::Test {
dest->Drop(bytes); dest->Drop(bytes);
} }
void FixChecksum(int header_offset, int len) { void FixChecksum(int header_offset, int len, bool recyclable) {
// Compute crc of type/len/data // Compute crc of type/len/data
uint32_t crc = crc32c::Value(&dest_contents()[header_offset+6], 1 + len); int header_size = recyclable ? kRecyclableHeaderSize : kHeaderSize;
uint32_t crc = crc32c::Value(&dest_contents()[header_offset + 6],
header_size - 6 + len);
crc = crc32c::Mask(crc); crc = crc32c::Mask(crc);
EncodeFixed32(&dest_contents()[header_offset], crc); EncodeFixed32(&dest_contents()[header_offset], crc);
} }
@ -259,8 +269,8 @@ class LogTest : public testing::Test {
unique_ptr<SequentialFileReader> file_reader( unique_ptr<SequentialFileReader> file_reader(
test::GetSequentialFileReader(new StringSource(reader_contents_))); test::GetSequentialFileReader(new StringSource(reader_contents_)));
unique_ptr<Reader> offset_reader( unique_ptr<Reader> offset_reader(
new Reader(std::move(file_reader), &report_, true /*checksum*/, new Reader(NULL, std::move(file_reader), &report_,
WrittenBytes() + offset_past_end)); true /*checksum*/, WrittenBytes() + offset_past_end, 123));
Slice record; Slice record;
std::string scratch; std::string scratch;
ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch)); ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
@ -271,8 +281,9 @@ class LogTest : public testing::Test {
WriteInitialOffsetLog(); WriteInitialOffsetLog();
unique_ptr<SequentialFileReader> file_reader( unique_ptr<SequentialFileReader> file_reader(
test::GetSequentialFileReader(new StringSource(reader_contents_))); test::GetSequentialFileReader(new StringSource(reader_contents_)));
unique_ptr<Reader> offset_reader(new Reader( unique_ptr<Reader> offset_reader(
std::move(file_reader), &report_, true /*checksum*/, initial_offset)); new Reader(NULL, std::move(file_reader), &report_,
true /*checksum*/, initial_offset, 123));
Slice record; Slice record;
std::string scratch; std::string scratch;
ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch)); ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
@ -291,16 +302,9 @@ size_t LogTest::initial_offset_record_sizes_[] =
2 * log::kBlockSize - 1000, // Span three blocks 2 * log::kBlockSize - 1000, // Span three blocks
1}; 1};
uint64_t LogTest::initial_offset_last_record_offsets_[] = TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
{0,
kHeaderSize + 10000,
2 * (kHeaderSize + 10000),
2 * (kHeaderSize + 10000) +
(2 * log::kBlockSize - 1000) + 3 * kHeaderSize};
TEST_F(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
TEST_F(LogTest, ReadWrite) { TEST_P(LogTest, ReadWrite) {
Write("foo"); Write("foo");
Write("bar"); Write("bar");
Write(""); Write("");
@ -313,7 +317,7 @@ TEST_F(LogTest, ReadWrite) {
ASSERT_EQ("EOF", Read()); // Make sure reads at eof work ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
} }
TEST_F(LogTest, ManyBlocks) { TEST_P(LogTest, ManyBlocks) {
for (int i = 0; i < 100000; i++) { for (int i = 0; i < 100000; i++) {
Write(NumberString(i)); Write(NumberString(i));
} }
@ -323,7 +327,7 @@ TEST_F(LogTest, ManyBlocks) {
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
} }
TEST_F(LogTest, Fragmentation) { TEST_P(LogTest, Fragmentation) {
Write("small"); Write("small");
Write(BigString("medium", 50000)); Write(BigString("medium", 50000));
Write(BigString("large", 100000)); Write(BigString("large", 100000));
@ -333,11 +337,12 @@ TEST_F(LogTest, Fragmentation) {
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
} }
TEST_F(LogTest, MarginalTrailer) { TEST_P(LogTest, MarginalTrailer) {
// Make a trailer that is exactly the same length as an empty record. // Make a trailer that is exactly the same length as an empty record.
const int n = kBlockSize - 2*kHeaderSize; int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
const int n = kBlockSize - 2 * header_size;
Write(BigString("foo", n)); Write(BigString("foo", n));
ASSERT_EQ((unsigned int)(kBlockSize - kHeaderSize), WrittenBytes()); ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
Write(""); Write("");
Write("bar"); Write("bar");
ASSERT_EQ(BigString("foo", n), Read()); ASSERT_EQ(BigString("foo", n), Read());
@ -346,11 +351,12 @@ TEST_F(LogTest, MarginalTrailer) {
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
} }
TEST_F(LogTest, MarginalTrailer2) { TEST_P(LogTest, MarginalTrailer2) {
// Make a trailer that is exactly the same length as an empty record. // Make a trailer that is exactly the same length as an empty record.
const int n = kBlockSize - 2*kHeaderSize; int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
const int n = kBlockSize - 2 * header_size;
Write(BigString("foo", n)); Write(BigString("foo", n));
ASSERT_EQ((unsigned int)(kBlockSize - kHeaderSize), WrittenBytes()); ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
Write("bar"); Write("bar");
ASSERT_EQ(BigString("foo", n), Read()); ASSERT_EQ(BigString("foo", n), Read());
ASSERT_EQ("bar", Read()); ASSERT_EQ("bar", Read());
@ -359,10 +365,11 @@ TEST_F(LogTest, MarginalTrailer2) {
ASSERT_EQ("", ReportMessage()); ASSERT_EQ("", ReportMessage());
} }
TEST_F(LogTest, ShortTrailer) { TEST_P(LogTest, ShortTrailer) {
const int n = kBlockSize - 2*kHeaderSize + 4; int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
const int n = kBlockSize - 2 * header_size + 4;
Write(BigString("foo", n)); Write(BigString("foo", n));
ASSERT_EQ((unsigned int)(kBlockSize - kHeaderSize + 4), WrittenBytes()); ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
Write(""); Write("");
Write("bar"); Write("bar");
ASSERT_EQ(BigString("foo", n), Read()); ASSERT_EQ(BigString("foo", n), Read());
@ -371,15 +378,16 @@ TEST_F(LogTest, ShortTrailer) {
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
} }
TEST_F(LogTest, AlignedEof) { TEST_P(LogTest, AlignedEof) {
const int n = kBlockSize - 2*kHeaderSize + 4; int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
const int n = kBlockSize - 2 * header_size + 4;
Write(BigString("foo", n)); Write(BigString("foo", n));
ASSERT_EQ((unsigned int)(kBlockSize - kHeaderSize + 4), WrittenBytes()); ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
ASSERT_EQ(BigString("foo", n), Read()); ASSERT_EQ(BigString("foo", n), Read());
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
} }
TEST_F(LogTest, RandomRead) { TEST_P(LogTest, RandomRead) {
const int N = 500; const int N = 500;
Random write_rnd(301); Random write_rnd(301);
for (int i = 0; i < N; i++) { for (int i = 0; i < N; i++) {
@ -394,7 +402,7 @@ TEST_F(LogTest, RandomRead) {
// Tests of all the error paths in log_reader.cc follow: // Tests of all the error paths in log_reader.cc follow:
TEST_F(LogTest, ReadError) { TEST_P(LogTest, ReadError) {
Write("foo"); Write("foo");
ForceError(); ForceError();
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
@ -402,17 +410,17 @@ TEST_F(LogTest, ReadError) {
ASSERT_EQ("OK", MatchError("read error")); ASSERT_EQ("OK", MatchError("read error"));
} }
TEST_F(LogTest, BadRecordType) { TEST_P(LogTest, BadRecordType) {
Write("foo"); Write("foo");
// Type is stored in header[6] // Type is stored in header[6]
IncrementByte(6, 100); IncrementByte(6, 100);
FixChecksum(0, 3); FixChecksum(0, 3, false);
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes()); ASSERT_EQ(3U, DroppedBytes());
ASSERT_EQ("OK", MatchError("unknown record type")); ASSERT_EQ("OK", MatchError("unknown record type"));
} }
TEST_F(LogTest, TruncatedTrailingRecordIsIgnored) { TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) {
Write("foo"); Write("foo");
ShrinkSize(4); // Drop all payload as well as a header byte ShrinkSize(4); // Drop all payload as well as a header byte
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
@ -421,17 +429,18 @@ TEST_F(LogTest, TruncatedTrailingRecordIsIgnored) {
ASSERT_EQ("", ReportMessage()); ASSERT_EQ("", ReportMessage());
} }
TEST_F(LogTest, TruncatedTrailingRecordIsNotIgnored) { TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
Write("foo"); Write("foo");
ShrinkSize(4); // Drop all payload as well as a header byte ShrinkSize(4); // Drop all payload as well as a header byte
ASSERT_EQ("EOF", Read(/*report_eof_inconsistency*/ true)); ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
// Truncated last record is ignored, not treated as an error // Truncated last record is ignored, not treated as an error
ASSERT_GT(DroppedBytes(), 0U); ASSERT_GT(DroppedBytes(), 0U);
ASSERT_EQ("OK", MatchError("Corruption: truncated header")); ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
} }
TEST_F(LogTest, BadLength) { TEST_P(LogTest, BadLength) {
const int kPayloadSize = kBlockSize - kHeaderSize; int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
const int kPayloadSize = kBlockSize - header_size;
Write(BigString("bar", kPayloadSize)); Write(BigString("bar", kPayloadSize));
Write("foo"); Write("foo");
// Least significant size byte is stored in header[4]. // Least significant size byte is stored in header[4].
@ -441,7 +450,7 @@ TEST_F(LogTest, BadLength) {
ASSERT_EQ("OK", MatchError("bad record length")); ASSERT_EQ("OK", MatchError("bad record length"));
} }
TEST_F(LogTest, BadLengthAtEndIsIgnored) { TEST_P(LogTest, BadLengthAtEndIsIgnored) {
Write("foo"); Write("foo");
ShrinkSize(1); ShrinkSize(1);
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
@ -449,63 +458,63 @@ TEST_F(LogTest, BadLengthAtEndIsIgnored) {
ASSERT_EQ("", ReportMessage()); ASSERT_EQ("", ReportMessage());
} }
TEST_F(LogTest, BadLengthAtEndIsNotIgnored) { TEST_P(LogTest, BadLengthAtEndIsNotIgnored) {
Write("foo"); Write("foo");
ShrinkSize(1); ShrinkSize(1);
ASSERT_EQ("EOF", Read(/*report_eof_inconsistency=*/true)); ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
ASSERT_GT(DroppedBytes(), 0U); ASSERT_GT(DroppedBytes(), 0U);
ASSERT_EQ("OK", MatchError("Corruption: truncated header")); ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
} }
TEST_F(LogTest, ChecksumMismatch) { TEST_P(LogTest, ChecksumMismatch) {
Write("foo"); Write("foooooo");
IncrementByte(0, 10); IncrementByte(0, 14);
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
ASSERT_EQ(10U, DroppedBytes()); ASSERT_EQ(14U + 4 * !!GetParam(), DroppedBytes());
ASSERT_EQ("OK", MatchError("checksum mismatch")); ASSERT_EQ("OK", MatchError("checksum mismatch"));
} }
TEST_F(LogTest, UnexpectedMiddleType) { TEST_P(LogTest, UnexpectedMiddleType) {
Write("foo"); Write("foo");
SetByte(6, kMiddleType); SetByte(6, GetParam() ? kRecyclableMiddleType : kMiddleType);
FixChecksum(0, 3); FixChecksum(0, 3, !!GetParam());
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes()); ASSERT_EQ(3U, DroppedBytes());
ASSERT_EQ("OK", MatchError("missing start")); ASSERT_EQ("OK", MatchError("missing start"));
} }
TEST_F(LogTest, UnexpectedLastType) { TEST_P(LogTest, UnexpectedLastType) {
Write("foo"); Write("foo");
SetByte(6, kLastType); SetByte(6, GetParam() ? kRecyclableLastType : kLastType);
FixChecksum(0, 3); FixChecksum(0, 3, !!GetParam());
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes()); ASSERT_EQ(3U, DroppedBytes());
ASSERT_EQ("OK", MatchError("missing start")); ASSERT_EQ("OK", MatchError("missing start"));
} }
TEST_F(LogTest, UnexpectedFullType) { TEST_P(LogTest, UnexpectedFullType) {
Write("foo"); Write("foo");
Write("bar"); Write("bar");
SetByte(6, kFirstType); SetByte(6, GetParam() ? kRecyclableFirstType : kFirstType);
FixChecksum(0, 3); FixChecksum(0, 3, !!GetParam());
ASSERT_EQ("bar", Read()); ASSERT_EQ("bar", Read());
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes()); ASSERT_EQ(3U, DroppedBytes());
ASSERT_EQ("OK", MatchError("partial record without end")); ASSERT_EQ("OK", MatchError("partial record without end"));
} }
TEST_F(LogTest, UnexpectedFirstType) { TEST_P(LogTest, UnexpectedFirstType) {
Write("foo"); Write("foo");
Write(BigString("bar", 100000)); Write(BigString("bar", 100000));
SetByte(6, kFirstType); SetByte(6, GetParam() ? kRecyclableFirstType : kFirstType);
FixChecksum(0, 3); FixChecksum(0, 3, !!GetParam());
ASSERT_EQ(BigString("bar", 100000), Read()); ASSERT_EQ(BigString("bar", 100000), Read());
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes()); ASSERT_EQ(3U, DroppedBytes());
ASSERT_EQ("OK", MatchError("partial record without end")); ASSERT_EQ("OK", MatchError("partial record without end"));
} }
TEST_F(LogTest, MissingLastIsIgnored) { TEST_P(LogTest, MissingLastIsIgnored) {
Write(BigString("bar", kBlockSize)); Write(BigString("bar", kBlockSize));
// Remove the LAST block, including header. // Remove the LAST block, including header.
ShrinkSize(14); ShrinkSize(14);
@ -514,16 +523,16 @@ TEST_F(LogTest, MissingLastIsIgnored) {
ASSERT_EQ(0U, DroppedBytes()); ASSERT_EQ(0U, DroppedBytes());
} }
TEST_F(LogTest, MissingLastIsNotIgnored) { TEST_P(LogTest, MissingLastIsNotIgnored) {
Write(BigString("bar", kBlockSize)); Write(BigString("bar", kBlockSize));
// Remove the LAST block, including header. // Remove the LAST block, including header.
ShrinkSize(14); ShrinkSize(14);
ASSERT_EQ("EOF", Read(/*report_eof_inconsistency=*/true)); ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
ASSERT_GT(DroppedBytes(), 0U); ASSERT_GT(DroppedBytes(), 0U);
ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data")); ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data"));
} }
TEST_F(LogTest, PartialLastIsIgnored) { TEST_P(LogTest, PartialLastIsIgnored) {
Write(BigString("bar", kBlockSize)); Write(BigString("bar", kBlockSize));
// Cause a bad record length in the LAST block. // Cause a bad record length in the LAST block.
ShrinkSize(1); ShrinkSize(1);
@ -532,18 +541,18 @@ TEST_F(LogTest, PartialLastIsIgnored) {
ASSERT_EQ(0U, DroppedBytes()); ASSERT_EQ(0U, DroppedBytes());
} }
TEST_F(LogTest, PartialLastIsNotIgnored) { TEST_P(LogTest, PartialLastIsNotIgnored) {
Write(BigString("bar", kBlockSize)); Write(BigString("bar", kBlockSize));
// Cause a bad record length in the LAST block. // Cause a bad record length in the LAST block.
ShrinkSize(1); ShrinkSize(1);
ASSERT_EQ("EOF", Read(/*report_eof_inconsistency=*/true)); ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
ASSERT_GT(DroppedBytes(), 0U); ASSERT_GT(DroppedBytes(), 0U);
ASSERT_EQ("OK", MatchError( ASSERT_EQ("OK", MatchError(
"Corruption: truncated headerCorruption: " "Corruption: truncated headerCorruption: "
"error reading trailing data")); "error reading trailing data"));
} }
TEST_F(LogTest, ErrorJoinsRecords) { TEST_P(LogTest, ErrorJoinsRecords) {
// Consider two fragmented records: // Consider two fragmented records:
// first(R1) last(R1) first(R2) last(R2) // first(R1) last(R1) first(R2) last(R2)
// where the middle two fragments disappear. We do not want // where the middle two fragments disappear. We do not want
@ -566,46 +575,60 @@ TEST_F(LogTest, ErrorJoinsRecords) {
ASSERT_GE(dropped, 2 * kBlockSize); ASSERT_GE(dropped, 2 * kBlockSize);
} }
TEST_F(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); } TEST_P(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }
TEST_F(LogTest, ReadSecondOneOff) { CheckInitialOffsetRecord(1, 1); } TEST_P(LogTest, ReadSecondOneOff) { CheckInitialOffsetRecord(1, 1); }
TEST_F(LogTest, ReadSecondTenThousand) { CheckInitialOffsetRecord(10000, 1); } TEST_P(LogTest, ReadSecondTenThousand) { CheckInitialOffsetRecord(10000, 1); }
TEST_F(LogTest, ReadSecondStart) { CheckInitialOffsetRecord(10007, 1); } TEST_P(LogTest, ReadSecondStart) {
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
CheckInitialOffsetRecord(10000 + header_size, 1);
}
TEST_F(LogTest, ReadThirdOneOff) { CheckInitialOffsetRecord(10008, 2); } TEST_P(LogTest, ReadThirdOneOff) {
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
CheckInitialOffsetRecord(10000 + header_size + 1, 2);
}
TEST_F(LogTest, ReadThirdStart) { CheckInitialOffsetRecord(20014, 2); } TEST_P(LogTest, ReadThirdStart) {
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
CheckInitialOffsetRecord(20000 + 2 * header_size, 2);
}
TEST_F(LogTest, ReadFourthOneOff) { CheckInitialOffsetRecord(20015, 3); } TEST_P(LogTest, ReadFourthOneOff) {
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
CheckInitialOffsetRecord(20000 + 2 * header_size + 1, 3);
}
TEST_F(LogTest, ReadFourthFirstBlockTrailer) { TEST_P(LogTest, ReadFourthFirstBlockTrailer) {
CheckInitialOffsetRecord(log::kBlockSize - 4, 3); CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
} }
TEST_F(LogTest, ReadFourthMiddleBlock) { TEST_P(LogTest, ReadFourthMiddleBlock) {
CheckInitialOffsetRecord(log::kBlockSize + 1, 3); CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
} }
TEST_F(LogTest, ReadFourthLastBlock) { TEST_P(LogTest, ReadFourthLastBlock) {
CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3); CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
} }
TEST_F(LogTest, ReadFourthStart) { TEST_P(LogTest, ReadFourthStart) {
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
CheckInitialOffsetRecord( CheckInitialOffsetRecord(
2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize, 2 * (header_size + 1000) + (2 * log::kBlockSize - 1000) + 3 * header_size,
3); 3);
} }
TEST_F(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); } TEST_P(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); }
TEST_F(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); } TEST_P(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); }
TEST_F(LogTest, ClearEofSingleBlock) { TEST_P(LogTest, ClearEofSingleBlock) {
Write("foo"); Write("foo");
Write("bar"); Write("bar");
ForceEOF(3 + kHeaderSize + 2); int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
ForceEOF(3 + header_size + 2);
ASSERT_EQ("foo", Read()); ASSERT_EQ("foo", Read());
UnmarkEOF(); UnmarkEOF();
ASSERT_EQ("bar", Read()); ASSERT_EQ("bar", Read());
@ -617,12 +640,13 @@ TEST_F(LogTest, ClearEofSingleBlock) {
ASSERT_TRUE(IsEOF()); ASSERT_TRUE(IsEOF());
} }
TEST_F(LogTest, ClearEofMultiBlock) { TEST_P(LogTest, ClearEofMultiBlock) {
size_t num_full_blocks = 5; size_t num_full_blocks = 5;
size_t n = (kBlockSize - kHeaderSize) * num_full_blocks + 25; int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
size_t n = (kBlockSize - header_size) * num_full_blocks + 25;
Write(BigString("foo", n)); Write(BigString("foo", n));
Write(BigString("bar", n)); Write(BigString("bar", n));
ForceEOF(n + num_full_blocks * kHeaderSize + 10); ForceEOF(n + num_full_blocks * header_size + header_size + 3);
ASSERT_EQ(BigString("foo", n), Read()); ASSERT_EQ(BigString("foo", n), Read());
ASSERT_TRUE(IsEOF()); ASSERT_TRUE(IsEOF());
UnmarkEOF(); UnmarkEOF();
@ -634,7 +658,7 @@ TEST_F(LogTest, ClearEofMultiBlock) {
ASSERT_TRUE(IsEOF()); ASSERT_TRUE(IsEOF());
} }
TEST_F(LogTest, ClearEofError) { TEST_P(LogTest, ClearEofError) {
// If an error occurs during Read() in UnmarkEOF(), the records contained // If an error occurs during Read() in UnmarkEOF(), the records contained
// in the buffer should be returned on subsequent calls of ReadRecord() // in the buffer should be returned on subsequent calls of ReadRecord()
// until no more full records are left, whereafter ReadRecord() should return // until no more full records are left, whereafter ReadRecord() should return
@ -652,7 +676,7 @@ TEST_F(LogTest, ClearEofError) {
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
} }
TEST_F(LogTest, ClearEofError2) { TEST_P(LogTest, ClearEofError2) {
Write("foo"); Write("foo");
Write("bar"); Write("bar");
UnmarkEOF(); UnmarkEOF();
@ -666,6 +690,8 @@ TEST_F(LogTest, ClearEofError2) {
ASSERT_EQ("OK", MatchError("read error")); ASSERT_EQ("OK", MatchError("read error"));
} }
INSTANTIATE_TEST_CASE_P(bool, LogTest, ::testing::Values(0, 2));
} // namespace log } // namespace log
} // namespace rocksdb } // namespace rocksdb

@ -18,8 +18,12 @@
namespace rocksdb { namespace rocksdb {
namespace log { namespace log {
Writer::Writer(unique_ptr<WritableFileWriter>&& dest) Writer::Writer(unique_ptr<WritableFileWriter>&& dest,
: dest_(std::move(dest)), block_offset_(0) { uint64_t log_number, bool recycle_log_files)
: dest_(std::move(dest)),
block_offset_(0),
log_number_(log_number),
recycle_log_files_(recycle_log_files) {
for (int i = 0; i <= kMaxRecordType; i++) { for (int i = 0; i <= kMaxRecordType; i++) {
char t = static_cast<char>(i); char t = static_cast<char>(i);
type_crc_[i] = crc32c::Value(&t, 1); type_crc_[i] = crc32c::Value(&t, 1);
@ -33,6 +37,10 @@ Status Writer::AddRecord(const Slice& slice) {
const char* ptr = slice.data(); const char* ptr = slice.data();
size_t left = slice.size(); size_t left = slice.size();
// Header size varies depending on whether we are recycling or not.
const int header_size =
recycle_log_files_ ? kRecyclableHeaderSize : kHeaderSize;
// Fragment the record if necessary and emit it. Note that if slice // Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single // is empty, we still want to iterate once to emit a single
// zero-length record // zero-length record
@ -41,32 +49,34 @@ Status Writer::AddRecord(const Slice& slice) {
do { do {
const int leftover = kBlockSize - block_offset_; const int leftover = kBlockSize - block_offset_;
assert(leftover >= 0); assert(leftover >= 0);
if (leftover < kHeaderSize) { if (leftover < header_size) {
// Switch to a new block // Switch to a new block
if (leftover > 0) { if (leftover > 0) {
// Fill the trailer (literal below relies on kHeaderSize being 7) // Fill the trailer (literal below relies on kHeaderSize and
assert(kHeaderSize == 7); // kRecyclableHeaderSize being <= 11)
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover)); assert(header_size <= 11);
dest_->Append(
Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", leftover));
} }
block_offset_ = 0; block_offset_ = 0;
} }
// Invariant: we never leave < kHeaderSize bytes in a block. // Invariant: we never leave < header_size bytes in a block.
assert(static_cast<int>(kBlockSize) - block_offset_ >= kHeaderSize); assert(static_cast<int>(kBlockSize) - block_offset_ >= header_size);
const size_t avail = kBlockSize - block_offset_ - kHeaderSize; const size_t avail = kBlockSize - block_offset_ - header_size;
const size_t fragment_length = (left < avail) ? left : avail; const size_t fragment_length = (left < avail) ? left : avail;
RecordType type; RecordType type;
const bool end = (left == fragment_length); const bool end = (left == fragment_length);
if (begin && end) { if (begin && end) {
type = kFullType; type = recycle_log_files_ ? kRecyclableFullType : kFullType;
} else if (begin) { } else if (begin) {
type = kFirstType; type = recycle_log_files_ ? kRecyclableFirstType : kFirstType;
} else if (end) { } else if (end) {
type = kLastType; type = recycle_log_files_ ? kRecyclableLastType : kLastType;
} else { } else {
type = kMiddleType; type = recycle_log_files_ ? kRecyclableMiddleType : kMiddleType;
} }
s = EmitPhysicalRecord(type, ptr, fragment_length); s = EmitPhysicalRecord(type, ptr, fragment_length);
@ -79,28 +89,48 @@ Status Writer::AddRecord(const Slice& slice) {
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
assert(n <= 0xffff); // Must fit in two bytes assert(n <= 0xffff); // Must fit in two bytes
assert(block_offset_ + kHeaderSize + n <= kBlockSize);
size_t header_size;
char buf[kRecyclableHeaderSize];
// Format the header // Format the header
char buf[kHeaderSize];
buf[4] = static_cast<char>(n & 0xff); buf[4] = static_cast<char>(n & 0xff);
buf[5] = static_cast<char>(n >> 8); buf[5] = static_cast<char>(n >> 8);
buf[6] = static_cast<char>(t); buf[6] = static_cast<char>(t);
uint32_t crc = type_crc_[t];
if (t < kRecyclableFullType) {
// Legacy record format
assert(block_offset_ + kHeaderSize + n <= kBlockSize);
header_size = kHeaderSize;
} else {
// Recyclable record format
assert(block_offset_ + kRecyclableHeaderSize + n <= kBlockSize);
header_size = kRecyclableHeaderSize;
// Only encode low 32-bits of the 64-bit log number. This means
// we will fail to detect an old record if we recycled a log from
// ~4 billion logs ago, but that is effectively impossible, and
// even if it were we'dbe far more likely to see a false positive
// on the 32-bit CRC.
EncodeFixed32(buf + 7, static_cast<uint32_t>(log_number_));
crc = crc32c::Extend(crc, buf + 7, 4);
}
// Compute the crc of the record type and the payload. // Compute the crc of the record type and the payload.
uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n); crc = crc32c::Extend(crc, ptr, n);
crc = crc32c::Mask(crc); // Adjust for storage crc = crc32c::Mask(crc); // Adjust for storage
EncodeFixed32(buf, crc); EncodeFixed32(buf, crc);
// Write the header and the payload // Write the header and the payload
Status s = dest_->Append(Slice(buf, kHeaderSize)); Status s = dest_->Append(Slice(buf, header_size));
if (s.ok()) { if (s.ok()) {
s = dest_->Append(Slice(ptr, n)); s = dest_->Append(Slice(ptr, n));
if (s.ok()) { if (s.ok()) {
s = dest_->Flush(); s = dest_->Flush();
} }
} }
block_offset_ += kHeaderSize + n; block_offset_ += header_size + n;
return s; return s;
} }

@ -43,7 +43,7 @@ namespace log {
* Data is written out in kBlockSize chunks. If next record does not fit * Data is written out in kBlockSize chunks. If next record does not fit
* into the space left, the leftover space will be padded with \0. * into the space left, the leftover space will be padded with \0.
* *
* Record format: * Legacy record format:
* *
* +---------+-----------+-----------+--- ... ---+ * +---------+-----------+-----------+--- ... ---+
* |CRC (4B) | Size (2B) | Type (1B) | Payload | * |CRC (4B) | Size (2B) | Type (1B) | Payload |
@ -57,13 +57,23 @@ namespace log {
* blocks that are larger than kBlockSize * blocks that are larger than kBlockSize
* Payload = Byte stream as long as specified by the payload size * Payload = Byte stream as long as specified by the payload size
* *
* Recyclable record format:
*
* +---------+-----------+-----------+----------------+--- ... ---+
* |CRC (4B) | Size (2B) | Type (1B) | Log number (4B)| Payload |
* +---------+-----------+-----------+----------------+--- ... ---+
*
* Same as above, with the addition of
* Log number = 32bit log file number, so that we can distinguish between
* records written by the most recent log writer vs a previous one.
*/ */
class Writer { class Writer {
public: public:
// Create a writer that will append data to "*dest". // Create a writer that will append data to "*dest".
// "*dest" must be initially empty. // "*dest" must be initially empty.
// "*dest" must remain live while this Writer is in use. // "*dest" must remain live while this Writer is in use.
explicit Writer(unique_ptr<WritableFileWriter>&& dest); explicit Writer(unique_ptr<WritableFileWriter>&& dest,
uint64_t log_number, bool recycle_log_files);
~Writer(); ~Writer();
Status AddRecord(const Slice& slice); Status AddRecord(const Slice& slice);
@ -74,6 +84,8 @@ class Writer {
private: private:
unique_ptr<WritableFileWriter> dest_; unique_ptr<WritableFileWriter> dest_;
int block_offset_; // Current offset in block int block_offset_; // Current offset in block
uint64_t log_number_;
bool recycle_log_files_;
// crc32c values for all supported record types. These are // crc32c values for all supported record types. These are
// pre-computed to reduce the overhead of computing the crc of the // pre-computed to reduce the overhead of computing the crc of the

@ -249,8 +249,8 @@ class Repairer {
// corruptions cause entire commits to be skipped instead of // corruptions cause entire commits to be skipped instead of
// propagating bad information (like overly large sequence // propagating bad information (like overly large sequence
// numbers). // numbers).
log::Reader reader(std::move(lfile_reader), &reporter, log::Reader reader(options_.info_log, std::move(lfile_reader), &reporter,
true /*enable checksum*/, 0 /*initial_offset*/); true /*enable checksum*/, 0 /*initial_offset*/, log);
// Read all the records and add to a memtable // Read all the records and add to a memtable
std::string scratch; std::string scratch;
@ -413,7 +413,7 @@ class Repairer {
{ {
unique_ptr<WritableFileWriter> file_writer( unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options)); new WritableFileWriter(std::move(file), env_options));
log::Writer log(std::move(file_writer)); log::Writer log(std::move(file_writer), 0, false);
std::string record; std::string record;
edit_->EncodeTo(&record); edit_->EncodeTo(&record);
status = log.AddRecord(record); status = log.AddRecord(record);

@ -262,8 +262,10 @@ Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) {
return s; return s;
} }
assert(file); assert(file);
currentLogReader_.reset(new log::Reader(std::move(file), &reporter_, currentLogReader_.reset(new log::Reader(options_->info_log,
read_options_.verify_checksums_, 0)); std::move(file), &reporter_,
read_options_.verify_checksums_, 0,
logFile->LogNumber()));
return Status::OK(); return Status::OK();
} }
} // namespace rocksdb } // namespace rocksdb

@ -2126,7 +2126,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
unique_ptr<WritableFileWriter> file_writer( unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(descriptor_file), opt_env_opts)); new WritableFileWriter(std::move(descriptor_file), opt_env_opts));
descriptor_log_.reset(new log::Writer(std::move(file_writer))); descriptor_log_.reset(new log::Writer(std::move(file_writer), 0, false));
s = WriteSnapshot(descriptor_log_.get()); s = WriteSnapshot(descriptor_log_.get());
} }
} }
@ -2390,8 +2390,8 @@ Status VersionSet::Recover(
{ {
VersionSet::LogReporter reporter; VersionSet::LogReporter reporter;
reporter.status = &s; reporter.status = &s;
log::Reader reader(std::move(manifest_file_reader), &reporter, log::Reader reader(NULL, std::move(manifest_file_reader), &reporter,
true /*checksum*/, 0 /*initial_offset*/); true /*checksum*/, 0 /*initial_offset*/, 0);
Slice record; Slice record;
std::string scratch; std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) { while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@ -2643,8 +2643,8 @@ Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
column_family_names.insert({0, kDefaultColumnFamilyName}); column_family_names.insert({0, kDefaultColumnFamilyName});
VersionSet::LogReporter reporter; VersionSet::LogReporter reporter;
reporter.status = &s; reporter.status = &s;
log::Reader reader(std::move(file_reader), &reporter, true /*checksum*/, log::Reader reader(NULL, std::move(file_reader), &reporter, true /*checksum*/,
0 /*initial_offset*/); 0 /*initial_offset*/, 0);
Slice record; Slice record;
std::string scratch; std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) { while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@ -2801,8 +2801,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
{ {
VersionSet::LogReporter reporter; VersionSet::LogReporter reporter;
reporter.status = &s; reporter.status = &s;
log::Reader reader(std::move(file_reader), &reporter, true /*checksum*/, log::Reader reader(NULL, std::move(file_reader), &reporter,
0 /*initial_offset*/); true /*checksum*/, 0 /*initial_offset*/, 0);
Slice record; Slice record;
std::string scratch; std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) { while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@ -3054,7 +3054,8 @@ bool VersionSet::ManifestContains(uint64_t manifest_file_num,
} }
file_reader.reset(new SequentialFileReader(std::move(file))); file_reader.reset(new SequentialFileReader(std::move(file)));
} }
log::Reader reader(std::move(file_reader), nullptr, true /*checksum*/, 0); log::Reader reader(NULL, std::move(file_reader), nullptr,
true /*checksum*/, 0, 0);
Slice r; Slice r;
std::string scratch; std::string scratch;
bool result = false; bool result = false;

@ -448,8 +448,8 @@ Status WalManager::ReadFirstLine(const std::string& fname,
reporter.fname = fname.c_str(); reporter.fname = fname.c_str();
reporter.status = &status; reporter.status = &status;
reporter.ignore_error = !db_options_.paranoid_checks; reporter.ignore_error = !db_options_.paranoid_checks;
log::Reader reader(std::move(file_reader), &reporter, true /*checksum*/, log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
0 /*initial_offset*/); true /*checksum*/, 0 /*initial_offset*/, *sequence);
std::string scratch; std::string scratch;
Slice record; Slice record;

@ -77,7 +77,7 @@ class WalManagerTest : public testing::Test {
ASSERT_OK(env_->NewWritableFile(fname, &file, env_options_)); ASSERT_OK(env_->NewWritableFile(fname, &file, env_options_));
unique_ptr<WritableFileWriter> file_writer( unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options_)); new WritableFileWriter(std::move(file), env_options_));
current_log_writer_.reset(new log::Writer(std::move(file_writer))); current_log_writer_.reset(new log::Writer(std::move(file_writer), 0, false));
} }
void CreateArchiveLogs(int num_logs, int entries_per_log) { void CreateArchiveLogs(int num_logs, int entries_per_log) {
@ -127,7 +127,8 @@ TEST_F(WalManagerTest, ReadFirstRecordCache) {
unique_ptr<WritableFileWriter> file_writer( unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), EnvOptions())); new WritableFileWriter(std::move(file), EnvOptions()));
log::Writer writer(std::move(file_writer)); log::Writer writer(std::move(file_writer), 1,
db_options_.recycle_log_file_num > 0);
WriteBatch batch; WriteBatch batch;
batch.Put("foo", "bar"); batch.Put("foo", "bar");
WriteBatchInternal::SetSequence(&batch, 10); WriteBatchInternal::SetSequence(&batch, 10);

@ -572,6 +572,8 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_log_file_time_to_roll(
rocksdb_options_t*, size_t); rocksdb_options_t*, size_t);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_keep_log_file_num( extern ROCKSDB_LIBRARY_API void rocksdb_options_set_keep_log_file_num(
rocksdb_options_t*, size_t); rocksdb_options_t*, size_t);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_recycle_log_file_num(
rocksdb_options_t*, size_t);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_soft_rate_limit( extern ROCKSDB_LIBRARY_API void rocksdb_options_set_soft_rate_limit(
rocksdb_options_t*, double); rocksdb_options_t*, double);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_hard_rate_limit( extern ROCKSDB_LIBRARY_API void rocksdb_options_set_hard_rate_limit(

@ -139,6 +139,12 @@ class Env {
unique_ptr<WritableFile>* result, unique_ptr<WritableFile>* result,
const EnvOptions& options) = 0; const EnvOptions& options) = 0;
// Reuse an existing file by renaming it and opening it as writable.
virtual Status ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
unique_ptr<WritableFile>* result,
const EnvOptions& options);
// Create an object that represents a directory. Will fail if directory // Create an object that represents a directory. Will fail if directory
// doesn't exist. If the directory exists, it will open the directory // doesn't exist. If the directory exists, it will open the directory
// and create a new Directory object. // and create a new Directory object.

@ -957,6 +957,16 @@ struct DBOptions {
// Default: 1000 // Default: 1000
size_t keep_log_file_num; size_t keep_log_file_num;
// Recycle log files.
// If non-zero, we will reuse previously written log files for new
// logs, overwriting the old data. The value indicates how many
// such files we will keep around at any point in time for later
// use. This is more efficient because the blocks are already
// allocated and fdatasync does not need to update the inode after
// each write.
// Default: 0
size_t recycle_log_file_num;
// manifest file is rolled over on reaching this limit. // manifest file is rolled over on reaching this limit.
// The older manifest file be deleted. // The older manifest file be deleted.
// The default value is MAX_INT so that roll-over does not take place. // The default value is MAX_INT so that roll-over does not take place.

@ -576,6 +576,33 @@ void Java_org_rocksdb_Options_setKeepLogFileNum(
} }
} }
/*
* Class: org_rocksdb_Options
* Method: recycleLogFiles
* Signature: (J)J
*/
jlong Java_org_rocksdb_Options_recycleLogFileNum(JNIEnv* env, jobject jobj,
jlong jhandle) {
return reinterpret_cast<rocksdb::Options*>(jhandle)->recycle_log_file_num;
}
/*
* Class: org_rocksdb_Options
* Method: setRecycleLogFiles
* Signature: (JJ)V
*/
void Java_org_rocksdb_Options_setRecycleLogFiles(JNIEnv* env, jobject jobj,
jlong jhandle,
jlong recycle_log_file_num) {
rocksdb::Status s = rocksdb::check_if_jlong_fits_size_t(recycle_log_file_num);
if (s.ok()) {
reinterpret_cast<rocksdb::Options*>(jhandle)->recycle_log_file_num =
recycle_log_file_num;
} else {
rocksdb::IllegalArgumentExceptionJni::ThrowNew(env, s);
}
}
/* /*
* Class: org_rocksdb_Options * Class: org_rocksdb_Options
* Method: maxManifestFileSize * Method: maxManifestFileSize
@ -3533,6 +3560,32 @@ jlong Java_org_rocksdb_DBOptions_keepLogFileNum(
return reinterpret_cast<rocksdb::DBOptions*>(jhandle)->keep_log_file_num; return reinterpret_cast<rocksdb::DBOptions*>(jhandle)->keep_log_file_num;
} }
/*
* Class: org_rocksdb_DBOptions
* Method: setRecycleLogFiles
* Signature: (JJ)V
*/
void Java_org_rocksdb_DBOptions_setRecycleLogFileNum(
JNIEnv* env, jobject jobj, jlong jhandle, jlong recycle_log_file_num) {
rocksdb::Status s = rocksdb::check_if_jlong_fits_size_t(recycle_log_file_num);
if (s.ok()) {
reinterpret_cast<rocksdb::DBOptions*>(jhandle)->recycle_log_file_num =
recycle_log_file_num;
} else {
rocksdb::IllegalArgumentExceptionJni::ThrowNew(env, s);
}
}
/*
* Class: org_rocksdb_DBOptions
* Method: recycleLogFiles
* Signature: (J)J
*/
jlong Java_org_rocksdb_DBOptions_recycleLogFileNum(JNIEnv* env, jobject jobj,
jlong jhandle) {
return reinterpret_cast<rocksdb::DBOptions*>(jhandle)->recycle_log_file_num;
}
/* /*
* Class: org_rocksdb_DBOptions * Class: org_rocksdb_DBOptions
* Method: setMaxManifestFileSize * Method: setMaxManifestFileSize

@ -1438,7 +1438,21 @@ void DumpWalFile(std::string wal_file, bool print_header, bool print_values,
} }
} else { } else {
StdErrReporter reporter; StdErrReporter reporter;
log::Reader reader(move(wal_file_reader), &reporter, true, 0); uint64_t log_number;
FileType type;
// we need the log number, but ParseFilename expects dbname/NNN.log.
string sanitized = wal_file;
size_t lastslash = sanitized.rfind('/');
if (lastslash != std::string::npos)
sanitized = sanitized.substr(lastslash + 1);
if (!ParseFileName(sanitized, &log_number, &type)) {
// bogus input, carry on as best we can
log_number = 0;
}
DBOptions db_options;
log::Reader reader(db_options.info_log, move(wal_file_reader), &reporter,
true, 0, log_number);
string scratch; string scratch;
WriteBatch batch; WriteBatch batch;
Slice record; Slice record;

@ -27,6 +27,17 @@ uint64_t Env::GetThreadID() const {
return hasher(std::this_thread::get_id()); return hasher(std::this_thread::get_id());
} }
Status Env::ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
unique_ptr<WritableFile>* result,
const EnvOptions& options) {
Status s = RenameFile(old_fname, fname);
if (!s.ok()) {
return s;
}
return NewWritableFile(fname, result, options);
}
SequentialFile::~SequentialFile() { SequentialFile::~SequentialFile() {
} }

@ -205,6 +205,50 @@ class PosixEnv : public Env {
return s; return s;
} }
virtual Status ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
unique_ptr<WritableFile>* result,
const EnvOptions& options) override {
result->reset();
Status s;
int fd = -1;
do {
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(old_fname.c_str(), O_RDWR, 0644);
} while (fd < 0 && errno == EINTR);
if (fd < 0) {
s = IOError(fname, errno);
} else {
SetFD_CLOEXEC(fd, &options);
// rename into place
if (rename(old_fname.c_str(), fname.c_str()) != 0) {
Status r = IOError(old_fname, errno);
close(fd);
return r;
}
if (options.use_mmap_writes) {
if (!checkedDiskForMmap_) {
// this will be executed once in the program's lifetime.
// do not use mmapWrite on non ext-3/xfs/tmpfs systems.
if (!SupportsFastAllocate(fname)) {
forceMmapOff = true;
}
checkedDiskForMmap_ = true;
}
}
if (options.use_mmap_writes && !forceMmapOff) {
result->reset(new PosixMmapFile(fname, fd, page_size_, options));
} else {
// disable mmap writes
EnvOptions no_mmap_writes_options = options;
no_mmap_writes_options.use_mmap_writes = false;
result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options));
}
}
return s;
}
virtual Status NewDirectory(const std::string& name, virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) override { unique_ptr<Directory>* result) override {
result->reset(); result->reset();

@ -231,6 +231,7 @@ DBOptions::DBOptions()
max_log_file_size(0), max_log_file_size(0),
log_file_time_to_roll(0), log_file_time_to_roll(0),
keep_log_file_num(1000), keep_log_file_num(1000),
recycle_log_file_num(0),
max_manifest_file_size(std::numeric_limits<uint64_t>::max()), max_manifest_file_size(std::numeric_limits<uint64_t>::max()),
table_cache_numshardbits(4), table_cache_numshardbits(4),
WAL_ttl_seconds(0), WAL_ttl_seconds(0),
@ -285,6 +286,7 @@ DBOptions::DBOptions(const Options& options)
max_log_file_size(options.max_log_file_size), max_log_file_size(options.max_log_file_size),
log_file_time_to_roll(options.log_file_time_to_roll), log_file_time_to_roll(options.log_file_time_to_roll),
keep_log_file_num(options.keep_log_file_num), keep_log_file_num(options.keep_log_file_num),
recycle_log_file_num(options.recycle_log_file_num),
max_manifest_file_size(options.max_manifest_file_size), max_manifest_file_size(options.max_manifest_file_size),
table_cache_numshardbits(options.table_cache_numshardbits), table_cache_numshardbits(options.table_cache_numshardbits),
WAL_ttl_seconds(options.WAL_ttl_seconds), WAL_ttl_seconds(options.WAL_ttl_seconds),
@ -338,6 +340,8 @@ void DBOptions::Dump(Logger* log) const {
log_file_time_to_roll); log_file_time_to_roll);
Header(log, " Options.keep_log_file_num: %" ROCKSDB_PRIszt, Header(log, " Options.keep_log_file_num: %" ROCKSDB_PRIszt,
keep_log_file_num); keep_log_file_num);
Header(log, " Options.recycle_log_file_num: %" ROCKSDB_PRIszt,
recycle_log_file_num);
Header(log, " Options.allow_os_buffer: %d", allow_os_buffer); Header(log, " Options.allow_os_buffer: %d", allow_os_buffer);
Header(log, " Options.allow_mmap_reads: %d", allow_mmap_reads); Header(log, " Options.allow_mmap_reads: %d", allow_mmap_reads);
Header(log, " Options.allow_fallocate: %d", allow_fallocate); Header(log, " Options.allow_fallocate: %d", allow_fallocate);

@ -207,6 +207,9 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{"keep_log_file_num", {"keep_log_file_num",
{offsetof(struct DBOptions, keep_log_file_num), OptionType::kSizeT, {offsetof(struct DBOptions, keep_log_file_num), OptionType::kSizeT,
OptionVerificationType::kNormal}}, OptionVerificationType::kNormal}},
{"recycle_log_file_num",
{offsetof(struct DBOptions, recycle_log_file_num), OptionType::kSizeT,
OptionVerificationType::kNormal}},
{"log_file_time_to_roll", {"log_file_time_to_roll",
{offsetof(struct DBOptions, log_file_time_to_roll), OptionType::kSizeT, {offsetof(struct DBOptions, log_file_time_to_roll), OptionType::kSizeT,
OptionVerificationType::kNormal}}, OptionVerificationType::kNormal}},

@ -323,6 +323,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"max_log_file_size", "37"}, {"max_log_file_size", "37"},
{"log_file_time_to_roll", "38"}, {"log_file_time_to_roll", "38"},
{"keep_log_file_num", "39"}, {"keep_log_file_num", "39"},
{"recycle_log_file_num", "5"},
{"max_manifest_file_size", "40"}, {"max_manifest_file_size", "40"},
{"table_cache_numshardbits", "41"}, {"table_cache_numshardbits", "41"},
{"WAL_ttl_seconds", "43"}, {"WAL_ttl_seconds", "43"},
@ -339,7 +340,8 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"new_table_reader_for_compaction_inputs", "true"}, {"new_table_reader_for_compaction_inputs", "true"},
{"compaction_readahead_size", "100"}, {"compaction_readahead_size", "100"},
{"bytes_per_sync", "47"}, {"bytes_per_sync", "47"},
{"wal_bytes_per_sync", "48"}, }; {"wal_bytes_per_sync", "48"},
};
ColumnFamilyOptions base_cf_opt; ColumnFamilyOptions base_cf_opt;
ColumnFamilyOptions new_cf_opt; ColumnFamilyOptions new_cf_opt;
@ -431,6 +433,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_db_opt.max_log_file_size, 37U); ASSERT_EQ(new_db_opt.max_log_file_size, 37U);
ASSERT_EQ(new_db_opt.log_file_time_to_roll, 38U); ASSERT_EQ(new_db_opt.log_file_time_to_roll, 38U);
ASSERT_EQ(new_db_opt.keep_log_file_num, 39U); ASSERT_EQ(new_db_opt.keep_log_file_num, 39U);
ASSERT_EQ(new_db_opt.recycle_log_file_num, 5U);
ASSERT_EQ(new_db_opt.max_manifest_file_size, static_cast<uint64_t>(40)); ASSERT_EQ(new_db_opt.max_manifest_file_size, static_cast<uint64_t>(40));
ASSERT_EQ(new_db_opt.table_cache_numshardbits, 41); ASSERT_EQ(new_db_opt.table_cache_numshardbits, 41);
ASSERT_EQ(new_db_opt.WAL_ttl_seconds, static_cast<uint64_t>(43)); ASSERT_EQ(new_db_opt.WAL_ttl_seconds, static_cast<uint64_t>(43));
@ -692,6 +695,7 @@ void RandomInitDBOptions(DBOptions* db_opt, Random* rnd) {
db_opt->skip_stats_update_on_db_open = rnd->Uniform(2); db_opt->skip_stats_update_on_db_open = rnd->Uniform(2);
db_opt->use_adaptive_mutex = rnd->Uniform(2); db_opt->use_adaptive_mutex = rnd->Uniform(2);
db_opt->use_fsync = rnd->Uniform(2); db_opt->use_fsync = rnd->Uniform(2);
db_opt->recycle_log_file_num = rnd->Uniform(2);
// int options // int options
db_opt->max_background_compactions = rnd->Uniform(100); db_opt->max_background_compactions = rnd->Uniform(100);

Loading…
Cancel
Save