Using existing crc32c checksum in checksum handoff for Manifest and WAL (#8412)

Summary:
In PR https://github.com/facebook/rocksdb/issues/7523 , checksum handoff is introduced in RocksDB for WAL, Manifest, and SST files. When user enable checksum handoff for a certain type of file, before the data is written to the lower layer storage system, we calculate the checksum (crc32c) of each piece of data and pass the checksum down with the data, such that data verification can be down by the lower layer storage system if it has the capability. However, it cannot cover the whole lifetime of the data in the memory and also it potentially introduces extra checksum calculation overhead.

In this PR, we introduce a new interface in WritableFileWriter::Append, which allows the caller be able to pass the data and the checksum (crc32c) together. In this way, WritableFileWriter can directly use the pass-in checksum (crc32c) to generate the checksum of data being passed down to the storage system. It saves the calculation overhead and achieves higher protection coverage. When a new checksum is added with the data, we use Crc32cCombine https://github.com/facebook/rocksdb/issues/8305 to combine the existing checksum and the new checksum. To avoid the segmenting of data by rate-limiter before it is stored, rate-limiter is called enough times to accumulate enough credits for a certain write. This design only support Manifest and WAL which use log_writer in the current stage.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8412

Test Plan: make check, add new testing cases.

Reviewed By: anand1976

Differential Revision: D29151545

Pulled By: zhichao-cao

fbshipit-source-id: 75e2278c5126cfd58393c67b1efd18dcc7a30772
main
Zhichao Cao 3 years ago committed by Facebook GitHub Bot
parent 3d844dff1d
commit a904c62d28
  1. 2
      db/blob/blob_file_builder.cc
  2. 2
      db/builder.cc
  3. 2
      db/compaction/compaction_job.cc
  4. 6
      db/db_impl/db_impl_open.cc
  5. 5
      db/log_writer.cc
  6. 1
      db/version_set.cc
  7. 265
      file/writable_file_writer.cc
  8. 15
      file/writable_file_writer.h
  9. 2
      table/sst_file_writer.cc
  10. 1
      util/crc32c.cc
  11. 177
      util/file_reader_writer_test.cc
  12. 39
      utilities/fault_injection_fs.cc
  13. 10
      utilities/fault_injection_fs.h

@ -191,7 +191,7 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
immutable_options_->clock, io_tracer_, statistics,
immutable_options_->listeners,
immutable_options_->file_checksum_gen_factory.get(),
tmp_set.Contains(FileType::kBlobFile)));
tmp_set.Contains(FileType::kBlobFile), false));
constexpr bool do_flush = false;

@ -165,7 +165,7 @@ Status BuildTable(
std::move(file), fname, file_options, ioptions.clock, io_tracer,
ioptions.stats, ioptions.listeners,
ioptions.file_checksum_gen_factory.get(),
tmp_set.Contains(FileType::kTableFile)));
tmp_set.Contains(FileType::kTableFile), false));
builder = NewTableBuilder(tboptions, file_writer.get());
}

@ -1986,7 +1986,7 @@ Status CompactionJob::OpenCompactionOutputFile(
std::move(writable_file), fname, file_options_, db_options_.clock,
io_tracer_, db_options_.stats, listeners,
db_options_.file_checksum_gen_factory.get(),
tmp_set.Contains(FileType::kTableFile)));
tmp_set.Contains(FileType::kTableFile), false));
TableBuilderOptions tboptions(
*cfd->ioptions(), *(sub_compact->compaction->mutable_cf_options()),

@ -300,7 +300,8 @@ Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(file), manifest, file_options, immutable_db_options_.clock,
io_tracer_, nullptr /* stats */, immutable_db_options_.listeners,
nullptr, tmp_set.Contains(FileType::kDescriptorFile)));
nullptr, tmp_set.Contains(FileType::kDescriptorFile),
tmp_set.Contains(FileType::kDescriptorFile)));
log::Writer log(std::move(file_writer), 0, false);
std::string record;
new_db.EncodeTo(&record);
@ -1540,7 +1541,8 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(lfile), log_fname, opt_file_options,
immutable_db_options_.clock, io_tracer_, nullptr /* stats */, listeners,
nullptr, tmp_set.Contains(FileType::kWalFile)));
nullptr, tmp_set.Contains(FileType::kWalFile),
tmp_set.Contains(FileType::kWalFile)));
*new_log = new log::Writer(std::move(file_writer), log_file_num,
immutable_db_options_.recycle_log_file_num > 0,
immutable_db_options_.manual_wal_flush);

@ -145,7 +145,8 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
}
// Compute the crc of the record type and the payload.
crc = crc32c::Extend(crc, ptr, n);
uint32_t payload_crc = crc32c::Value(ptr, n);
crc = crc32c::Crc32cCombine(crc, payload_crc, n);
crc = crc32c::Mask(crc); // Adjust for storage
TEST_SYNC_POINT_CALLBACK("LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum",
&crc);
@ -154,7 +155,7 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
// Write the header and the payload
IOStatus s = dest_->Append(Slice(buf, header_size));
if (s.ok()) {
s = dest_->Append(Slice(ptr, n));
s = dest_->Append(Slice(ptr, n), payload_crc);
}
block_offset_ += header_size + n;
return s;

@ -4133,6 +4133,7 @@ Status VersionSet::ProcessManifestWrites(
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(descriptor_file), descriptor_fname, opt_file_opts, clock_,
io_tracer_, nullptr, db_options_->listeners, nullptr,
tmp_set.Contains(FileType::kDescriptorFile),
tmp_set.Contains(FileType::kDescriptorFile)));
descriptor_log_.reset(
new log::Writer(std::move(file_writer), 0, false));

@ -36,7 +36,8 @@ Status WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,
return s;
}
IOStatus WritableFileWriter::Append(const Slice& data) {
IOStatus WritableFileWriter::Append(const Slice& data,
uint32_t crc32c_checksum) {
const char* src = data.data();
size_t left = data.size();
IOStatus s;
@ -81,26 +82,74 @@ IOStatus WritableFileWriter::Append(const Slice& data) {
assert(buf_.CurrentSize() == 0);
}
// We never write directly to disk with direct I/O on.
// or we simply use it for its original purpose to accumulate many small
// chunks
if (use_direct_io() || (buf_.Capacity() >= left)) {
while (left > 0) {
size_t appended = buf_.Append(src, left);
left -= appended;
src += appended;
if (left > 0) {
s = Flush();
if (!s.ok()) {
break;
if (perform_data_verification_ && buffered_data_with_checksum_ &&
crc32c_checksum != 0) {
// Since we want to use the checksum of the input data, we cannot break it
// into several pieces. We will only write them in the buffer when buffer
// size is enough. Otherwise, we will directly write it down.
if (use_direct_io() || (buf_.Capacity() - buf_.CurrentSize()) >= left) {
if ((buf_.Capacity() - buf_.CurrentSize()) >= left) {
size_t appended = buf_.Append(src, left);
if (appended != left) {
s = IOStatus::Corruption("Write buffer append failure");
}
buffered_data_crc32c_checksum_ = crc32c::Crc32cCombine(
buffered_data_crc32c_checksum_, crc32c_checksum, appended);
} else {
while (left > 0) {
size_t appended = buf_.Append(src, left);
buffered_data_crc32c_checksum_ =
crc32c::Extend(buffered_data_crc32c_checksum_, src, appended);
left -= appended;
src += appended;
if (left > 0) {
s = Flush();
if (!s.ok()) {
break;
}
}
}
}
} else {
assert(buf_.CurrentSize() == 0);
buffered_data_crc32c_checksum_ = crc32c_checksum;
s = WriteBufferedWithChecksum(src, left);
}
} else {
// Writing directly to file bypassing the buffer
assert(buf_.CurrentSize() == 0);
s = WriteBuffered(src, left);
// In this case, either we do not need to do the data verification or
// caller does not provide the checksum of the data (crc32c_checksum = 0).
//
// We never write directly to disk with direct I/O on.
// or we simply use it for its original purpose to accumulate many small
// chunks
if (use_direct_io() || (buf_.Capacity() >= left)) {
while (left > 0) {
size_t appended = buf_.Append(src, left);
if (perform_data_verification_ && buffered_data_with_checksum_) {
buffered_data_crc32c_checksum_ =
crc32c::Extend(buffered_data_crc32c_checksum_, src, appended);
}
left -= appended;
src += appended;
if (left > 0) {
s = Flush();
if (!s.ok()) {
break;
}
}
}
} else {
// Writing directly to file bypassing the buffer
assert(buf_.CurrentSize() == 0);
if (perform_data_verification_ && buffered_data_with_checksum_) {
buffered_data_crc32c_checksum_ = crc32c::Value(src, left);
s = WriteBufferedWithChecksum(src, left);
} else {
s = WriteBuffered(src, left);
}
}
}
TEST_KILL_RANDOM("WritableFileWriter::Append:1");
@ -114,6 +163,7 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes) {
assert(pad_bytes < kDefaultPageSize);
size_t left = pad_bytes;
size_t cap = buf_.Capacity() - buf_.CurrentSize();
size_t pad_start = buf_.CurrentSize();
// Assume pad_bytes is small compared to buf_ capacity. So we always
// use buf_ rather than write directly to file in certain cases like
@ -132,6 +182,11 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes) {
}
pending_sync_ = true;
filesize_ += pad_bytes;
if (perform_data_verification_) {
buffered_data_crc32c_checksum_ =
crc32c::Extend(buffered_data_crc32c_checksum_,
buf_.BufferStart() + pad_start, pad_bytes);
}
return IOStatus::OK();
}
@ -232,11 +287,19 @@ IOStatus WritableFileWriter::Flush() {
if (use_direct_io()) {
#ifndef ROCKSDB_LITE
if (pending_sync_) {
s = WriteDirect();
if (perform_data_verification_ && buffered_data_with_checksum_) {
s = WriteDirectWithChecksum();
} else {
s = WriteDirect();
}
}
#endif // !ROCKSDB_LITE
} else {
s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
if (perform_data_verification_ && buffered_data_with_checksum_) {
s = WriteBufferedWithChecksum(buf_.BufferStart(), buf_.CurrentSize());
} else {
s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
}
}
if (!s.ok()) {
return s;
@ -451,6 +514,76 @@ IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) {
src += allowed;
}
buf_.Size(0);
buffered_data_crc32c_checksum_ = 0;
return s;
}
IOStatus WritableFileWriter::WriteBufferedWithChecksum(const char* data,
size_t size) {
IOStatus s;
assert(!use_direct_io());
assert(perform_data_verification_ && buffered_data_with_checksum_);
const char* src = data;
size_t left = size;
DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)];
// Check how much is allowed. Here, we loop until the rate limiter allows to
// write the entire buffer.
// TODO: need to be improved since it sort of defeats the purpose of the rate
// limiter
size_t data_size = left;
if (rate_limiter_ != nullptr) {
while (data_size > 0) {
size_t tmp_size;
tmp_size = rate_limiter_->RequestToken(
data_size, buf_.Alignment(), writable_file_->GetIOPriority(), stats_,
RateLimiter::OpType::kWrite);
data_size -= tmp_size;
}
}
{
IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
#ifndef ROCKSDB_LITE
FileOperationInfo::StartTimePoint start_ts;
uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr);
if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow();
old_size = next_write_offset_;
}
#endif
{
auto prev_perf_level = GetPerfLevel();
IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->Append(Slice(src, left), IOOptions(), v_info,
nullptr);
SetPerfLevel(prev_perf_level);
}
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
NotifyOnFileWriteFinish(old_size, left, start_ts, finish_ts, s);
}
#endif
if (!s.ok()) {
return s;
}
}
IOSTATS_ADD(bytes_written, left);
TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0");
// Buffer write is successful, reset the buffer current size to 0 and reset
// the corresponding checksum value
buf_.Size(0);
buffered_data_crc32c_checksum_ = 0;
return s;
}
@ -565,5 +698,99 @@ IOStatus WritableFileWriter::WriteDirect() {
}
return s;
}
IOStatus WritableFileWriter::WriteDirectWithChecksum() {
assert(use_direct_io());
assert(perform_data_verification_ && buffered_data_with_checksum_);
IOStatus s;
const size_t alignment = buf_.Alignment();
assert((next_write_offset_ % alignment) == 0);
// Calculate whole page final file advance if all writes succeed
size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize());
// Calculate the leftover tail, we write it here padded with zeros BUT we
// will write
// it again in the future either on Close() OR when the current whole page
// fills out
size_t leftover_tail = buf_.CurrentSize() - file_advance;
// Round up, pad, and combine the checksum.
size_t last_cur_size = buf_.CurrentSize();
buf_.PadToAlignmentWith(0);
size_t padded_size = buf_.CurrentSize() - last_cur_size;
const char* padded_start = buf_.BufferStart() + last_cur_size;
uint32_t padded_checksum = crc32c::Value(padded_start, padded_size);
buffered_data_crc32c_checksum_ = crc32c::Crc32cCombine(
buffered_data_crc32c_checksum_, padded_checksum, padded_size);
const char* src = buf_.BufferStart();
uint64_t write_offset = next_write_offset_;
size_t left = buf_.CurrentSize();
DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)];
// Check how much is allowed. Here, we loop until the rate limiter allows to
// write the entire buffer.
// TODO: need to be improved since it sort of defeats the purpose of the rate
// limiter
size_t data_size = left;
if (rate_limiter_ != nullptr) {
while (data_size > 0) {
size_t size;
size = rate_limiter_->RequestToken(data_size, buf_.Alignment(),
writable_file_->GetIOPriority(),
stats_, RateLimiter::OpType::kWrite);
data_size -= size;
}
}
{
IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
FileOperationInfo::StartTimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow();
}
// direct writes must be positional
EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->PositionedAppend(Slice(src, left), write_offset,
IOOptions(), v_info, nullptr);
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
NotifyOnFileWriteFinish(write_offset, left, start_ts, finish_ts, s);
}
if (!s.ok()) {
// In this case, we do not change buffered_data_crc32c_checksum_ because
// it still aligns with the data in the buffer.
buf_.Size(file_advance + leftover_tail);
buffered_data_crc32c_checksum_ =
crc32c::Value(buf_.BufferStart(), buf_.CurrentSize());
return s;
}
}
IOSTATS_ADD(bytes_written, left);
assert((next_write_offset_ % alignment) == 0);
if (s.ok()) {
// Move the tail to the beginning of the buffer
// This never happens during normal Append but rather during
// explicit call to Flush()/Sync() or Close(). Also the buffer checksum will
// recalculated accordingly.
buf_.RefitTail(file_advance, leftover_tail);
// Adjust the checksum value to align with the data in the buffer
buffered_data_crc32c_checksum_ =
crc32c::Value(buf_.BufferStart(), buf_.CurrentSize());
// This is where we start writing next time which may or not be
// the actual file size on disk. They match if the buffer size
// is a multiple of whole pages otherwise filesize_ is leftover_tail
// behind
next_write_offset_ += file_advance;
}
return s;
}
#endif // !ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE

@ -144,6 +144,8 @@ class WritableFileWriter {
std::unique_ptr<FileChecksumGenerator> checksum_generator_;
bool checksum_finalized_;
bool perform_data_verification_;
uint32_t buffered_data_crc32c_checksum_;
bool buffered_data_with_checksum_;
public:
WritableFileWriter(
@ -153,7 +155,8 @@ class WritableFileWriter {
Statistics* stats = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {},
FileChecksumGenFactory* file_checksum_gen_factory = nullptr,
bool perform_data_verification = false)
bool perform_data_verification = false,
bool buffered_data_with_checksum = false)
: file_name_(_file_name),
writable_file_(std::move(file), io_tracer, _file_name),
clock_(clock),
@ -171,7 +174,9 @@ class WritableFileWriter {
listeners_(),
checksum_generator_(nullptr),
checksum_finalized_(false),
perform_data_verification_(perform_data_verification) {
perform_data_verification_(perform_data_verification),
buffered_data_crc32c_checksum_(0),
buffered_data_with_checksum_(buffered_data_with_checksum) {
TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
reinterpret_cast<void*>(max_buffer_size_));
buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
@ -210,7 +215,9 @@ class WritableFileWriter {
std::string file_name() const { return file_name_; }
IOStatus Append(const Slice& data);
// When this Append API is called, if the crc32c_checksum is not provided, we
// will calculate the checksum internally.
IOStatus Append(const Slice& data, uint32_t crc32c_checksum = 0);
IOStatus Pad(const size_t pad_bytes);
@ -251,9 +258,11 @@ class WritableFileWriter {
// DMA such as in Direct I/O mode
#ifndef ROCKSDB_LITE
IOStatus WriteDirect();
IOStatus WriteDirectWithChecksum();
#endif // !ROCKSDB_LITE
// Normal write
IOStatus WriteBuffered(const char* data, size_t size);
IOStatus WriteBufferedWithChecksum(const char* data, size_t size);
IOStatus RangeSync(uint64_t offset, uint64_t nbytes);
IOStatus SyncInternal(bool use_fsync);
};

@ -264,7 +264,7 @@ Status SstFileWriter::Open(const std::string& file_path) {
std::move(sst_file), file_path, r->env_options, r->ioptions.clock,
nullptr /* io_tracer */, nullptr /* stats */, r->ioptions.listeners,
r->ioptions.file_checksum_gen_factory.get(),
tmp_set.Contains(FileType::kTableFile)));
tmp_set.Contains(FileType::kTableFile), false));
// TODO(tec) : If table_factory is using compressed block cache, we will
// be adding the external sst file blocks into it, which is wasteful.

@ -1283,7 +1283,6 @@ uint32_t Extend(uint32_t crc, const char* buf, size_t size) {
return ChosenExtend(crc, buf, size);
}
// The code for crc32c combine, copied with permission from folly
// Standard galois-field multiply. The only modification is that a,

@ -6,16 +6,20 @@
#include <algorithm>
#include <vector>
#include "db/db_test_util.h"
#include "env/mock_env.h"
#include "file/line_file_reader.h"
#include "file/random_access_file_reader.h"
#include "file/read_write_util.h"
#include "file/readahead_raf.h"
#include "file/sequence_file_reader.h"
#include "file/writable_file_writer.h"
#include "rocksdb/file_system.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/crc32c.h"
#include "util/random.h"
#include "utilities/fault_injection_fs.h"
namespace ROCKSDB_NAMESPACE {
@ -230,6 +234,179 @@ TEST_F(WritableFileWriterTest, IncrementalBuffer) {
}
}
class DBWritableFileWriterTest : public DBTestBase {
public:
DBWritableFileWriterTest()
: DBTestBase("/db_secondary_cache_test", /*env_do_fsync=*/true) {
fault_fs_.reset(new FaultInjectionTestFS(env_->GetFileSystem()));
fault_env_.reset(new CompositeEnvWrapper(env_, fault_fs_));
}
std::shared_ptr<FaultInjectionTestFS> fault_fs_;
std::unique_ptr<Env> fault_env_;
};
TEST_F(DBWritableFileWriterTest, AppendWithChecksum) {
FileOptions file_options = FileOptions();
Options options = GetDefaultOptions();
options.create_if_missing = true;
DestroyAndReopen(options);
std::string fname = this->dbname_ + "/test_file";
std::unique_ptr<FSWritableFile> writable_file_ptr;
ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr,
/*dbg*/ nullptr));
std::unique_ptr<TestFSWritableFile> file;
file.reset(new TestFSWritableFile(
fname, file_options, std::move(writable_file_ptr), fault_fs_.get()));
std::unique_ptr<WritableFileWriter> file_writer;
ImmutableOptions ioptions(options);
file_writer.reset(new WritableFileWriter(
std::move(file), fname, file_options, SystemClock::Default().get(),
nullptr, ioptions.stats, ioptions.listeners,
ioptions.file_checksum_gen_factory.get(), true, true));
Random rnd(301);
std::string data = rnd.RandomString(1000);
uint32_t data_crc32c = crc32c::Value(data.c_str(), data.size());
fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c));
ASSERT_OK(file_writer->Flush());
Random size_r(47);
for (int i = 0; i < 2000; i++) {
data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
data_crc32c = crc32c::Value(data.c_str(), data.size());
ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c));
data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
ASSERT_OK(file_writer->Append(Slice(data.c_str())));
ASSERT_OK(file_writer->Flush());
}
ASSERT_OK(file_writer->Close());
Destroy(options);
}
TEST_F(DBWritableFileWriterTest, AppendVerifyNoChecksum) {
FileOptions file_options = FileOptions();
Options options = GetDefaultOptions();
options.create_if_missing = true;
DestroyAndReopen(options);
std::string fname = this->dbname_ + "/test_file";
std::unique_ptr<FSWritableFile> writable_file_ptr;
ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr,
/*dbg*/ nullptr));
std::unique_ptr<TestFSWritableFile> file;
file.reset(new TestFSWritableFile(
fname, file_options, std::move(writable_file_ptr), fault_fs_.get()));
std::unique_ptr<WritableFileWriter> file_writer;
ImmutableOptions ioptions(options);
// Enable checksum handoff for this file, but do not enable buffer checksum.
// So Append with checksum logic will not be triggered
file_writer.reset(new WritableFileWriter(
std::move(file), fname, file_options, SystemClock::Default().get(),
nullptr, ioptions.stats, ioptions.listeners,
ioptions.file_checksum_gen_factory.get(), true, false));
Random rnd(301);
std::string data = rnd.RandomString(1000);
uint32_t data_crc32c = crc32c::Value(data.c_str(), data.size());
fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c));
ASSERT_OK(file_writer->Flush());
Random size_r(47);
for (int i = 0; i < 1000; i++) {
data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
data_crc32c = crc32c::Value(data.c_str(), data.size());
ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c));
data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
ASSERT_OK(file_writer->Append(Slice(data.c_str())));
ASSERT_OK(file_writer->Flush());
}
ASSERT_OK(file_writer->Close());
Destroy(options);
}
TEST_F(DBWritableFileWriterTest, AppendWithChecksumRateLimiter) {
FileOptions file_options = FileOptions();
file_options.rate_limiter = nullptr;
Options options = GetDefaultOptions();
options.create_if_missing = true;
DestroyAndReopen(options);
std::string fname = this->dbname_ + "/test_file";
std::unique_ptr<FSWritableFile> writable_file_ptr;
ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr,
/*dbg*/ nullptr));
std::unique_ptr<TestFSWritableFile> file;
file.reset(new TestFSWritableFile(
fname, file_options, std::move(writable_file_ptr), fault_fs_.get()));
std::unique_ptr<WritableFileWriter> file_writer;
ImmutableOptions ioptions(options);
// Enable checksum handoff for this file, but do not enable buffer checksum.
// So Append with checksum logic will not be triggered
file_writer.reset(new WritableFileWriter(
std::move(file), fname, file_options, SystemClock::Default().get(),
nullptr, ioptions.stats, ioptions.listeners,
ioptions.file_checksum_gen_factory.get(), true, true));
fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
Random rnd(301);
std::string data;
uint32_t data_crc32c;
uint64_t start = fault_env_->NowMicros();
Random size_r(47);
uint64_t bytes_written = 0;
for (int i = 0; i < 100; i++) {
data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
data_crc32c = crc32c::Value(data.c_str(), data.size());
ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c));
bytes_written += static_cast<uint64_t>(data.size());
data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
ASSERT_OK(file_writer->Append(Slice(data.c_str())));
ASSERT_OK(file_writer->Flush());
bytes_written += static_cast<uint64_t>(data.size());
}
uint64_t elapsed = fault_env_->NowMicros() - start;
double raw_rate = bytes_written * 1000000.0 / elapsed;
ASSERT_OK(file_writer->Close());
// Set the rate-limiter
FileOptions file_options1 = FileOptions();
file_options1.rate_limiter =
NewGenericRateLimiter(static_cast<int64_t>(0.5 * raw_rate));
fname = this->dbname_ + "/test_file_1";
std::unique_ptr<FSWritableFile> writable_file_ptr1;
ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options1,
&writable_file_ptr1,
/*dbg*/ nullptr));
file.reset(new TestFSWritableFile(
fname, file_options1, std::move(writable_file_ptr1), fault_fs_.get()));
// Enable checksum handoff for this file, but do not enable buffer checksum.
// So Append with checksum logic will not be triggered
file_writer.reset(new WritableFileWriter(
std::move(file), fname, file_options1, SystemClock::Default().get(),
nullptr, ioptions.stats, ioptions.listeners,
ioptions.file_checksum_gen_factory.get(), true, true));
for (int i = 0; i < 1000; i++) {
data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
data_crc32c = crc32c::Value(data.c_str(), data.size());
ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c));
data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
ASSERT_OK(file_writer->Append(Slice(data.c_str())));
ASSERT_OK(file_writer->Flush());
}
ASSERT_OK(file_writer->Close());
if (file_options1.rate_limiter != nullptr) {
delete file_options1.rate_limiter;
}
Destroy(options);
}
#ifndef ROCKSDB_LITE
TEST_F(WritableFileWriterTest, AppendStatusReturn) {
class FakeWF : public FSWritableFile {

@ -140,8 +140,8 @@ IOStatus TestFSWritableFile::Append(const Slice& data, const IOOptions&,
// By setting the IngestDataCorruptionBeforeWrite(), the data corruption is
// simulated.
IOStatus TestFSWritableFile::Append(
const Slice& data, const IOOptions&,
const DataVerificationInfo& verification_info, IODebugContext*) {
const Slice& data, const IOOptions& options,
const DataVerificationInfo& verification_info, IODebugContext* dbg) {
MutexLock l(&mutex_);
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
@ -161,10 +161,39 @@ IOStatus TestFSWritableFile::Append(
"current data checksum: " + checksum;
return IOStatus::Corruption(msg);
}
if (target_->use_direct_io()) {
target_->Append(data, options, dbg).PermitUncheckedError();
} else {
state_.buffer_.append(data.data(), data.size());
state_.pos_ += data.size();
fs_->WritableFileAppended(state_);
}
return IOStatus::OK();
}
state_.buffer_.append(data.data(), data.size());
state_.pos_ += data.size();
fs_->WritableFileAppended(state_);
IOStatus TestFSWritableFile::PositionedAppend(
const Slice& data, uint64_t offset, const IOOptions& options,
const DataVerificationInfo& verification_info, IODebugContext* dbg) {
MutexLock l(&mutex_);
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
if (fs_->ShouldDataCorruptionBeforeWrite()) {
return IOStatus::Corruption("Data is corrupted!");
}
// Calculate the checksum
std::string checksum;
CalculateTypedChecksum(fs_->GetChecksumHandoffFuncType(), data.data(),
data.size(), &checksum);
if (fs_->GetChecksumHandoffFuncType() != ChecksumType::kNoChecksum &&
checksum != verification_info.checksum.ToString()) {
std::string msg = "Data is corrupted! Origin data checksum: " +
verification_info.checksum.ToString() +
"current data checksum: " + checksum;
return IOStatus::Corruption(msg);
}
target_->PositionedAppend(data, offset, options, dbg);
return IOStatus::OK();
}

@ -65,9 +65,9 @@ class TestFSWritableFile : public FSWritableFile {
virtual ~TestFSWritableFile();
virtual IOStatus Append(const Slice& data, const IOOptions&,
IODebugContext*) override;
virtual IOStatus Append(const Slice& data, const IOOptions&,
virtual IOStatus Append(const Slice& data, const IOOptions& options,
const DataVerificationInfo& verification_info,
IODebugContext*) override;
IODebugContext* dbg) override;
virtual IOStatus Truncate(uint64_t size, const IOOptions& options,
IODebugContext* dbg) override {
return target_->Truncate(size, options, dbg);
@ -84,10 +84,8 @@ class TestFSWritableFile : public FSWritableFile {
}
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
const IOOptions& options,
const DataVerificationInfo& /*verification_info*/,
IODebugContext* dbg) override {
return PositionedAppend(data, offset, options, dbg);
}
const DataVerificationInfo& verification_info,
IODebugContext* dbg) override;
virtual size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}

Loading…
Cancel
Save