Add new Append API with DataVerificationInfo to Env WritableFile (#8071)

Summary:
Add the new Append and PositionedAppend API to env WritableFile. User is able to benefit from the write checksum handoff API when using the legacy Env classes. FileSystem already implemented the checksum handoff API.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8071

Test Plan: make check, added new unit test.

Reviewed By: anand1976

Differential Revision: D27177043

Pulled By: zhichao-cao

fbshipit-source-id: 430c8331fc81099fa6d00f4fff703b68b9e8080e
main
Zhichao Cao 4 years ago committed by Facebook GitHub Bot
parent 7ee41a5d25
commit dd0447ae2c
  1. 1
      HISTORY.md
  2. 26
      db/db_test_util.h
  3. 14
      env/composite_env.cc
  4. 37
      env/env_test.cc
  5. 35
      include/rocksdb/env.h
  6. 6
      tools/db_bench_tool.cc
  7. 9
      utilities/env_mirror.cc
  8. 10
      utilities/fault_injection_env.h

@ -15,6 +15,7 @@
* For the new integrated BlobDB implementation, compaction statistics now include the amount of data read from blob files during compaction (due to garbage collection or compaction filters). Write amplification metrics have also been extended to account for data read from blob files.
* Add EqualWithoutTimestamp() to Comparator.
* Extend support to track blob files in SSTFileManager whenever a blob file is created/deleted. Blob files will be scheduled to delete via SSTFileManager and SStFileManager will now take blob files in account while calculating size and space limits along with SST files.
* Add new Append and PositionedAppend API with checksum handoff to legacy Env.
### New Features
* Support compaction filters for the new implementation of BlobDB. Add `FilterBlobByKey()` to `CompactionFilter`. Subclasses can override this method so that compaction filters can determine whether the actual blob value has to be read during compaction. Use a new `kUndetermined` in `CompactionFilter::Decision` to indicated that further action is necessary for compaction filter to make a decision.

@ -229,6 +229,11 @@ class SpecialEnv : public EnvWrapper {
return base_->Append(data);
}
}
Status Append(
const Slice& data,
const DataVerificationInfo& /* verification_info */) override {
return Append(data);
}
Status PositionedAppend(const Slice& data, uint64_t offset) override {
if (env_->table_write_callback_) {
(*env_->table_write_callback_)();
@ -243,6 +248,11 @@ class SpecialEnv : public EnvWrapper {
return base_->PositionedAppend(data, offset);
}
}
Status PositionedAppend(
const Slice& data, uint64_t offset,
const DataVerificationInfo& /* verification_info */) override {
return PositionedAppend(data, offset);
}
Status Truncate(uint64_t size) override { return base_->Truncate(size); }
Status RangeSync(uint64_t offset, uint64_t nbytes) override {
Status s = base_->RangeSync(offset, nbytes);
@ -305,6 +315,12 @@ class SpecialEnv : public EnvWrapper {
return base_->Append(data);
}
}
Status Append(
const Slice& data,
const DataVerificationInfo& /*verification_info*/) override {
return Append(data);
}
Status Truncate(uint64_t size) override { return base_->Truncate(size); }
Status Close() override { return base_->Close(); }
Status Flush() override { return base_->Flush(); }
@ -356,6 +372,11 @@ class SpecialEnv : public EnvWrapper {
#endif
return s;
}
Status Append(
const Slice& data,
const DataVerificationInfo& /* verification_info */) override {
return Append(data);
}
Status Truncate(uint64_t size) override { return base_->Truncate(size); }
Status Close() override {
// SyncPoint is not supported in Released Windows Mode.
@ -394,6 +415,11 @@ class SpecialEnv : public EnvWrapper {
OtherFile(SpecialEnv* env, std::unique_ptr<WritableFile>&& b)
: env_(env), base_(std::move(b)) {}
Status Append(const Slice& data) override { return base_->Append(data); }
Status Append(
const Slice& data,
const DataVerificationInfo& /*verification_info*/) override {
return Append(data);
}
Status Truncate(uint64_t size) override { return base_->Truncate(size); }
Status Close() override { return base_->Close(); }
Status Flush() override { return base_->Flush(); }

@ -112,11 +112,25 @@ class CompositeWritableFileWrapper : public WritableFile {
IODebugContext dbg;
return target_->Append(data, io_opts, &dbg);
}
Status Append(const Slice& data,
const DataVerificationInfo& verification_info) override {
IOOptions io_opts;
IODebugContext dbg;
return target_->Append(data, io_opts, verification_info, &dbg);
}
Status PositionedAppend(const Slice& data, uint64_t offset) override {
IOOptions io_opts;
IODebugContext dbg;
return target_->PositionedAppend(data, offset, io_opts, &dbg);
}
Status PositionedAppend(
const Slice& data, uint64_t offset,
const DataVerificationInfo& verification_info) override {
IOOptions io_opts;
IODebugContext dbg;
return target_->PositionedAppend(data, offset, io_opts, verification_info,
&dbg);
}
Status Truncate(uint64_t size) override {
IOOptions io_opts;
IODebugContext dbg;

37
env/env_test.cc vendored

@ -40,6 +40,7 @@
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "util/string_util.h"
@ -1668,12 +1669,26 @@ TEST_P(EnvPosixTestWithParam, WritableFileWrapper) {
return Status::OK();
}
Status Append(
const Slice& /*data*/,
const DataVerificationInfo& /* verification_info */) override {
inc(1);
return Status::OK();
}
Status PositionedAppend(const Slice& /*data*/,
uint64_t /*offset*/) override {
inc(2);
return Status::OK();
}
Status PositionedAppend(
const Slice& /*data*/, uint64_t /*offset*/,
const DataVerificationInfo& /* verification_info */) override {
inc(2);
return Status::OK();
}
Status Truncate(uint64_t /*size*/) override {
inc(3);
return Status::OK();
@ -2224,6 +2239,28 @@ TEST_F(EnvTest, IsDirectory) {
ASSERT_FALSE(is_dir);
}
TEST_F(EnvTest, EnvWriteVerificationTest) {
Status s = Env::Default()->CreateDirIfMissing(test_directory_);
const std::string test_file_path = test_directory_ + "file1";
ASSERT_OK(s);
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::unique_ptr<WritableFile> file;
s = fault_fs_env->NewWritableFile(test_file_path, &file, EnvOptions());
ASSERT_OK(s);
DataVerificationInfo v_info;
std::string test_data = "test";
std::string checksum;
uint32_t v_crc32c = crc32c::Extend(0, test_data.c_str(), test_data.size());
PutFixed32(&checksum, v_crc32c);
v_info.checksum = Slice(checksum);
s = file->Append(Slice(test_data), v_info);
ASSERT_OK(s);
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -48,6 +48,7 @@ class Logger;
class RandomAccessFile;
class SequentialFile;
class Slice;
struct DataVerificationInfo;
class WritableFile;
class RandomRWFile;
class MemoryMappedFileBuffer;
@ -801,6 +802,18 @@ class WritableFile {
// PositionedAppend, so the users cannot mix the two.
virtual Status Append(const Slice& data) = 0;
// Append data with verification information.
// Note that this API change is experimental and it might be changed in
// the future. Currently, RocksDB only generates crc32c based checksum for
// the file writes when the checksum handoff option is set.
// Expected behavior: if currently ChecksumType::kCRC32C is not supported by
// WritableFile, the information in DataVerificationInfo can be ignored
// (i.e. does not perform checksum verification).
virtual Status Append(const Slice& data,
const DataVerificationInfo& /* verification_info */) {
return Append(data);
}
// PositionedAppend data to the specified offset. The new EOF after append
// must be larger than the previous EOF. This is to be used when writes are
// not backed by OS buffers and hence has to always start from the start of
@ -827,6 +840,19 @@ class WritableFile {
"WritableFile::PositionedAppend() not supported.");
}
// PositionedAppend data with verification information.
// Note that this API change is experimental and it might be changed in
// the future. Currently, RocksDB only generates crc32c based checksum for
// the file writes when the checksum handoff option is set.
// Expected behavior: if currently ChecksumType::kCRC32C is not supported by
// WritableFile, the information in DataVerificationInfo can be ignored
// (i.e. does not perform checksum verification).
virtual Status PositionedAppend(
const Slice& /* data */, uint64_t /* offset */,
const DataVerificationInfo& /* verification_info */) {
return Status::NotSupported("PositionedAppend");
}
// Truncate is necessary to trim the file to the correct size
// before closing. It is not always possible to keep track of the file
// size due to whole pages writes. The behavior is undefined if called
@ -1540,9 +1566,18 @@ class WritableFileWrapper : public WritableFile {
explicit WritableFileWrapper(WritableFile* t) : target_(t) {}
Status Append(const Slice& data) override { return target_->Append(data); }
Status Append(const Slice& data,
const DataVerificationInfo& verification_info) override {
return target_->Append(data, verification_info);
}
Status PositionedAppend(const Slice& data, uint64_t offset) override {
return target_->PositionedAppend(data, offset);
}
Status PositionedAppend(
const Slice& data, uint64_t offset,
const DataVerificationInfo& verification_info) override {
return target_->PositionedAppend(data, offset, verification_info);
}
Status Truncate(uint64_t size) override { return target_->Truncate(size); }
Status Close() override { return target_->Close(); }
Status Flush() override { return target_->Flush(); }

@ -1543,6 +1543,12 @@ class ReportFileOpEnv : public EnvWrapper {
return rv;
}
Status Append(
const Slice& data,
const DataVerificationInfo& /* verification_info */) override {
return Append(data);
}
Status Truncate(uint64_t size) override { return target_->Truncate(size); }
Status Close() override { return target_->Close(); }
Status Flush() override { return target_->Flush(); }

@ -107,12 +107,21 @@ class WritableFileMirror : public WritableFile {
assert(as == bs);
return as;
}
Status Append(const Slice& data,
const DataVerificationInfo& /* verification_info */) override {
return Append(data);
}
Status PositionedAppend(const Slice& data, uint64_t offset) override {
Status as = a_->PositionedAppend(data, offset);
Status bs = b_->PositionedAppend(data, offset);
assert(as == bs);
return as;
}
Status PositionedAppend(
const Slice& data, uint64_t offset,
const DataVerificationInfo& /* verification_info */) override {
return PositionedAppend(data, offset);
}
Status Truncate(uint64_t size) override {
Status as = a_->Truncate(size);
Status bs = b_->Truncate(size);

@ -73,6 +73,11 @@ class TestWritableFile : public WritableFile {
FaultInjectionTestEnv* env);
virtual ~TestWritableFile();
virtual Status Append(const Slice& data) override;
virtual Status Append(
const Slice& data,
const DataVerificationInfo& /*verification_info*/) override {
return Append(data);
}
virtual Status Truncate(uint64_t size) override {
return target_->Truncate(size);
}
@ -84,6 +89,11 @@ class TestWritableFile : public WritableFile {
uint64_t offset) override {
return target_->PositionedAppend(data, offset);
}
virtual Status PositionedAppend(
const Slice& data, uint64_t offset,
const DataVerificationInfo& /*verification_info*/) override {
return PositionedAppend(data, offset);
}
virtual bool use_direct_io() const override {
return target_->use_direct_io();
};

Loading…
Cancel
Save