Fix a bug causing duplicate trailing entries in WritableFile (buffered IO) (#9236)

Summary:
`db_stress` is a user of `FaultInjectionTestFS`. After injecting a write error, `db_stress` probabilistically determins
data drop (https://github.com/facebook/rocksdb/blob/6.27.fb/db_stress_tool/db_stress_test_base.cc#L2615:L2619).

In some of our recent runs of `db_stress`, we found duplicate trailing entries corresponding to file trivial move in
the MANIFEST, causing the recovery to fail, because the file move operation is not idempotent: you cannot delete a
file from a given level twice.

Investigation suggests that data buffering in both `WritableFileWriter` and `FaultInjectionTestFS` may be the root cause.

WritableFileWriter buffers data to write in a memory buffer, `WritableFileWriter::buf_`. After each
`WriteBuffered()`/`WriteBufferedWithChecksum()` succeeds, the `buf_` is cleared.

If the underlying file `WritableFileWriter::writable_file_` is opened in buffered IO mode, then `FaultInjectionTestFS`
buffers data written for each file until next file sync. After an injected error, user of `FaultInjectionFS` can
choose to drop some or none of previously buffered data. If `db_stress` does not drop any unsynced data, then
such data will still exist in the `FaultInjectionTestFS`'s buffer.

Existing implementation of `WritableileWriter::WriteBuffered()` does not clear `buf_` if there is an error. This may lead
to the data being buffered two copies: one in `WritableFileWriter`, and another in `FaultInjectionTestFS`.
We also know that the `WritableFileWriter` of MANIFEST file will close upon an error.  During `Close()`, it will flush the
content in `buf_`. If no write error is injected to `FaultInjectionTestFS` this time, then we end up with two copies of the
data appended to the file.

To fix, we clear the `WritableFileWriter::buf_` upon failure as well. We focus this PR on files opened in non-direct mode.

This PR includes a unit test to reproduce a case when write error injection
to `WritableFile` can cause duplicate trailing entries.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9236

Test Plan: make check

Reviewed By: zhichao-cao

Differential Revision: D33033984

Pulled By: riversand963

fbshipit-source-id: ebfa5a0db8cbf1ed73100528b34fcba543c5db31
main
Yanqin Jin 3 years ago committed by Facebook GitHub Bot
parent 8a97c541e4
commit 08721293ea
  1. 1
      HISTORY.md
  2. 74
      db/fault_injection_test.cc
  3. 44
      file/writable_file_writer.cc
  4. 2
      utilities/fault_injection_fs.h

@ -7,6 +7,7 @@
* Fixed a bug in rocksdb automatic implicit prefetching which got broken because of new feature adaptive_readahead and internal prefetching got disabled when iterator moves from one file to next.
* Fixed a bug in TableOptions.prepopulate_block_cache which causes segmentation fault when used with TableOptions.partition_filters = true and TableOptions.cache_index_and_filter_blocks = true.
* Fixed a bug affecting custom memtable factories which are not registered with the `ObjectRegistry`. The bug could result in failure to save the OPTIONS file.
* Fixed a bug causing two duplicate entries to be appended to a file opened in non-direct mode and tracked by `FaultInjectionTestFS`.
### Behavior Changes
* MemTableList::TrimHistory now use allocated bytes when max_write_buffer_size_to_maintain > 0(default in TrasactionDB, introduced in PR#5022) Fix #8371.

@ -28,6 +28,9 @@
#include "util/mutexlock.h"
#include "util/random.h"
#include "utilities/fault_injection_env.h"
#ifndef NDEBUG
#include "utilities/fault_injection_fs.h"
#endif
namespace ROCKSDB_NAMESPACE {
@ -58,7 +61,6 @@ class FaultInjectionTest
bool sequential_order_;
protected:
public:
enum ExpectedVerifResult { kValExpectFound, kValExpectNoError };
enum ResetMethod {
@ -544,6 +546,76 @@ TEST_P(FaultInjectionTest, WriteBatchWalTerminationTest) {
ASSERT_EQ(db_->Get(ro, "boys", &val), Status::NotFound());
}
TEST_P(FaultInjectionTest, NoDuplicateTrailingEntries) {
auto fault_fs = std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
fault_fs->EnableWriteErrorInjection();
fault_fs->SetFilesystemDirectWritable(false);
const std::string file_name = NormalizePath(dbname_ + "/test_file");
std::unique_ptr<log::Writer> log_writer = nullptr;
constexpr uint64_t log_number = 0;
{
std::unique_ptr<FSWritableFile> file;
const Status s =
fault_fs->NewWritableFile(file_name, FileOptions(), &file, nullptr);
ASSERT_OK(s);
std::unique_ptr<WritableFileWriter> fwriter(
new WritableFileWriter(std::move(file), file_name, FileOptions()));
log_writer.reset(new log::Writer(std::move(fwriter), log_number,
/*recycle_log_files=*/false));
}
fault_fs->SetRandomWriteError(
0xdeadbeef, /*one_in=*/1, IOStatus::IOError("Injected IOError"),
/*inject_for_all_file_types=*/true, /*types=*/{});
{
VersionEdit edit;
edit.SetColumnFamily(0);
std::string buf;
assert(edit.EncodeTo(&buf));
const Status s = log_writer->AddRecord(buf);
ASSERT_NOK(s);
}
fault_fs->DisableWriteErrorInjection();
// Closing the log writer will cause WritableFileWriter::Close() and flush
// remaining data from its buffer to underlying file.
log_writer.reset();
{
std::unique_ptr<FSSequentialFile> file;
Status s =
fault_fs->NewSequentialFile(file_name, FileOptions(), &file, nullptr);
ASSERT_OK(s);
std::unique_ptr<SequentialFileReader> freader(
new SequentialFileReader(std::move(file), file_name));
Status log_read_s;
class LogReporter : public log::Reader::Reporter {
public:
Status* status_;
explicit LogReporter(Status* _s) : status_(_s) {}
void Corruption(size_t /*bytes*/, const Status& _s) override {
if (status_->ok()) {
*status_ = _s;
}
}
} reporter(&log_read_s);
std::unique_ptr<log::Reader> log_reader(new log::Reader(
nullptr, std::move(freader), &reporter, /*checksum=*/true, log_number));
Slice record;
std::string data;
size_t count = 0;
while (log_reader->ReadRecord(&record, &data) && log_read_s.ok()) {
VersionEdit edit;
ASSERT_OK(edit.DecodeFrom(data));
++count;
}
// Verify that only one version edit exists in the file.
ASSERT_EQ(1, count);
}
}
INSTANTIATE_TEST_CASE_P(
FaultTest, FaultInjectionTest,
::testing::Values(std::make_tuple(false, kDefault, kEnd),

@ -516,6 +516,19 @@ IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) {
} else {
s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr);
}
if (!s.ok()) {
// If writable_file_->Append() failed, then the data may or may not
// exist in the underlying memory buffer, OS page cache, remote file
// system's buffer, etc. If WritableFileWriter keeps the data in
// buf_, then a future Close() or write retry may send the data to
// the underlying file again. If the data does exist in the
// underlying buffer and gets written to the file eventually despite
// returning error, the file may end up with two duplicate pieces of
// data. Therefore, clear the buf_ at the WritableFileWriter layer
// and let caller determine error handling.
buf_.Size(0);
buffered_data_crc32c_checksum_ = 0;
}
SetPerfLevel(prev_perf_level);
}
#ifndef ROCKSDB_LITE
@ -603,6 +616,17 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(const char* data,
}
#endif
if (!s.ok()) {
// If writable_file_->Append() failed, then the data may or may not
// exist in the underlying memory buffer, OS page cache, remote file
// system's buffer, etc. If WritableFileWriter keeps the data in
// buf_, then a future Close() or write retry may send the data to
// the underlying file again. If the data does exist in the
// underlying buffer and gets written to the file eventually despite
// returning error, the file may end up with two duplicate pieces of
// data. Therefore, clear the buf_ at the WritableFileWriter layer
// and let caller determine error handling.
buf_.Size(0);
buffered_data_crc32c_checksum_ = 0;
return s;
}
}
@ -652,13 +676,13 @@ IOStatus WritableFileWriter::WriteDirect() {
assert((next_write_offset_ % alignment) == 0);
// Calculate whole page final file advance if all writes succeed
size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize());
const size_t file_advance =
TruncateToPageBoundary(alignment, buf_.CurrentSize());
// Calculate the leftover tail, we write it here padded with zeros BUT we
// will write
// it again in the future either on Close() OR when the current whole page
// fills out
size_t leftover_tail = buf_.CurrentSize() - file_advance;
// will write it again in the future either on Close() OR when the current
// whole page fills out.
const size_t leftover_tail = buf_.CurrentSize() - file_advance;
// Round up and pad
buf_.PadToAlignmentWith(0);
@ -741,13 +765,13 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum() {
assert((next_write_offset_ % alignment) == 0);
// Calculate whole page final file advance if all writes succeed
size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize());
const size_t file_advance =
TruncateToPageBoundary(alignment, buf_.CurrentSize());
// Calculate the leftover tail, we write it here padded with zeros BUT we
// will write
// it again in the future either on Close() OR when the current whole page
// fills out
size_t leftover_tail = buf_.CurrentSize() - file_advance;
// will write it again in the future either on Close() OR when the current
// whole page fills out.
const size_t leftover_tail = buf_.CurrentSize() - file_advance;
// Round up, pad, and combine the checksum.
size_t last_cur_size = buf_.CurrentSize();

@ -507,6 +507,8 @@ class FaultInjectionTestFS : public FileSystemWrapper {
int read_error_one_in() const { return read_error_one_in_.load(); }
int write_error_one_in() const { return write_error_one_in_; }
// We capture a backtrace every time a fault is injected, for debugging
// purposes. This call prints the backtrace to stderr and frees the
// saved callstack

Loading…
Cancel
Save