Integrate blob file writing with the flush logic (#7345)

Summary:
The patch adds support for writing blob files during flush by integrating
`BlobFileBuilder` with the flush logic, most importantly, `BuildTable` and
`CompactionIterator`. If `enable_blob_files` is set, large values are extracted
to blob files and replaced with references. The resulting blob files are then
logged to the MANIFEST as part of the flush job's `VersionEdit` and
added to the `Version`, similarly to table files. Errors related to writing
blob files fail the flush, and any blob files written by such jobs are immediately
deleted (again, similarly to how SST files are handled). In addition, the patch
extends the logging and statistics around flushes to account for the presence
of blob files (e.g. `InternalStats::CompactionStats::bytes_written`, which is
used for calculating write amplification, now considers the blob files as well).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7345

Test Plan: Tested using `make check` and `db_bench`.

Reviewed By: riversand963

Differential Revision: D23506369

Pulled By: ltamasi

fbshipit-source-id: 646885f22dfbe063f650d38a1fedc132f499a159
main
Levi Tamasi 4 years ago committed by Facebook GitHub Bot
parent d4993b9b60
commit b0e7834100
  1. 18
      db/blob/blob_file_builder.cc
  2. 3
      db/blob/blob_file_builder.h
  3. 149
      db/blob/blob_file_builder_test.cc
  4. 43
      db/builder.cc
  5. 7
      db/builder.h
  6. 55
      db/compaction/compaction_iterator.cc
  7. 6
      db/compaction/compaction_iterator.h
  8. 3
      db/compaction/compaction_iterator_test.cc
  9. 6
      db/compaction/compaction_job.cc
  10. 164
      db/db_flush_test.cc
  11. 47
      db/db_impl/db_impl_compaction_flush.cc
  12. 10
      db/db_impl/db_impl_open.cc
  13. 39
      db/flush_job.cc
  14. 2
      db/internal_stats.h
  15. 97
      db/memtable_list.cc
  16. 21
      db/repair.cc
  17. 4
      db/version_edit.cc
  18. 16
      db/version_edit.h

@ -31,12 +31,13 @@ BlobFileBuilder::BlobFileBuilder(
int job_id, uint32_t column_family_id,
const std::string& column_family_name, Env::IOPriority io_priority,
Env::WriteLifeTimeHint write_hint,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions)
: BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, env,
fs, immutable_cf_options, mutable_cf_options,
file_options, job_id, column_family_id,
column_family_name, io_priority, write_hint,
blob_file_additions) {}
blob_file_paths, blob_file_additions) {}
BlobFileBuilder::BlobFileBuilder(
std::function<uint64_t()> file_number_generator, Env* env, FileSystem* fs,
@ -45,6 +46,7 @@ BlobFileBuilder::BlobFileBuilder(
int job_id, uint32_t column_family_id,
const std::string& column_family_name, Env::IOPriority io_priority,
Env::WriteLifeTimeHint write_hint,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions)
: file_number_generator_(std::move(file_number_generator)),
env_(env),
@ -59,6 +61,7 @@ BlobFileBuilder::BlobFileBuilder(
column_family_name_(column_family_name),
io_priority_(io_priority),
write_hint_(write_hint),
blob_file_paths_(blob_file_paths),
blob_file_additions_(blob_file_additions),
blob_count_(0),
blob_bytes_(0) {
@ -67,7 +70,10 @@ BlobFileBuilder::BlobFileBuilder(
assert(fs_);
assert(immutable_cf_options_);
assert(file_options_);
assert(blob_file_paths_);
assert(blob_file_paths_->empty());
assert(blob_file_additions_);
assert(blob_file_additions_->empty());
}
BlobFileBuilder::~BlobFileBuilder() = default;
@ -145,7 +151,7 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
assert(immutable_cf_options_);
assert(!immutable_cf_options_->cf_paths.empty());
const std::string blob_file_path = BlobFileName(
std::string blob_file_path = BlobFileName(
immutable_cf_options_->cf_paths.front().path, blob_file_number);
std::unique_ptr<FSWritableFile> file;
@ -161,6 +167,12 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
}
}
// Note: files get added to blob_file_paths_ right after the open, so they
// can be cleaned up upon failure. Contrast this with blob_file_additions_,
// which only contains successfully written files.
assert(blob_file_paths_);
blob_file_paths_->emplace_back(std::move(blob_file_path));
assert(file);
file->SetIOPriority(io_priority_);
file->SetWriteLifeTimeHint(write_hint_);
@ -168,7 +180,7 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
Statistics* const statistics = immutable_cf_options_->statistics;
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(file), blob_file_path, *file_options_, env_,
std::move(file), blob_file_paths_->back(), *file_options_, env_,
nullptr /*IOTracer*/, statistics, immutable_cf_options_->listeners,
immutable_cf_options_->file_checksum_gen_factory));

@ -36,6 +36,7 @@ class BlobFileBuilder {
const std::string& column_family_name,
Env::IOPriority io_priority,
Env::WriteLifeTimeHint write_hint,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions);
BlobFileBuilder(std::function<uint64_t()> file_number_generator, Env* env,
@ -47,6 +48,7 @@ class BlobFileBuilder {
const std::string& column_family_name,
Env::IOPriority io_priority,
Env::WriteLifeTimeHint write_hint,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions);
BlobFileBuilder(const BlobFileBuilder&) = delete;
@ -79,6 +81,7 @@ class BlobFileBuilder {
std::string column_family_name_;
Env::IOPriority io_priority_;
Env::WriteLifeTimeHint write_hint_;
std::vector<std::string>* blob_file_paths_;
std::vector<BlobFileAddition>* blob_file_additions_;
std::unique_ptr<BlobLogWriter> writer_;
uint64_t blob_count_;

@ -42,17 +42,15 @@ class BlobFileBuilderTest : public testing::Test {
protected:
BlobFileBuilderTest() : mock_env_(Env::Default()), fs_(&mock_env_) {}
void VerifyBlobFile(const ImmutableCFOptions& immutable_cf_options,
uint64_t blob_file_number, uint32_t column_family_id,
void VerifyBlobFile(uint64_t blob_file_number,
const std::string& blob_file_path,
uint32_t column_family_id,
CompressionType blob_compression_type,
const std::vector<std::pair<std::string, std::string>>&
expected_key_value_pairs,
const std::vector<std::string>& blob_indexes) {
assert(expected_key_value_pairs.size() == blob_indexes.size());
const std::string blob_file_path = BlobFileName(
immutable_cf_options.cf_paths.front().path, blob_file_number);
std::unique_ptr<FSRandomAccessFile> file;
constexpr IODebugContext* dbg = nullptr;
ASSERT_OK(
@ -137,12 +135,14 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) {
constexpr Env::IOPriority io_priority = Env::IO_HIGH;
constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM;
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(
TestFileNumberGenerator(), &mock_env_, &fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint, &blob_file_additions);
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
&blob_file_paths, &blob_file_additions);
std::vector<std::pair<std::string, std::string>> expected_key_value_pairs(
number_of_blobs);
@ -168,12 +168,20 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) {
ASSERT_OK(builder.Finish());
// Check the metadata generated
constexpr uint64_t blob_file_number = 2;
ASSERT_EQ(blob_file_paths.size(), 1);
const std::string& blob_file_path = blob_file_paths[0];
ASSERT_EQ(blob_file_path,
BlobFileName(immutable_cf_options.cf_paths.front().path,
blob_file_number));
ASSERT_EQ(blob_file_additions.size(), 1);
const auto& blob_file_addition = blob_file_additions[0];
constexpr uint64_t blob_file_number = 2;
ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number);
ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), number_of_blobs);
ASSERT_EQ(
@ -181,7 +189,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) {
number_of_blobs * (BlobLogRecord::kHeaderSize + key_size + value_size));
// Verify the contents of the new blob file as well as the blob references
VerifyBlobFile(immutable_cf_options, blob_file_number, column_family_id,
VerifyBlobFile(blob_file_number, blob_file_path, column_family_id,
kNoCompression, expected_key_value_pairs, blob_indexes);
}
@ -210,12 +218,14 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) {
constexpr Env::IOPriority io_priority = Env::IO_HIGH;
constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM;
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(
TestFileNumberGenerator(), &mock_env_, &fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint, &blob_file_additions);
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
&blob_file_paths, &blob_file_additions);
std::vector<std::pair<std::string, std::string>> expected_key_value_pairs(
number_of_blobs);
@ -241,12 +251,19 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) {
ASSERT_OK(builder.Finish());
// Check the metadata generated
ASSERT_EQ(blob_file_paths.size(), number_of_blobs);
ASSERT_EQ(blob_file_additions.size(), number_of_blobs);
for (size_t i = 0; i < number_of_blobs; ++i) {
const uint64_t blob_file_number = i + 2;
ASSERT_EQ(blob_file_paths[i],
BlobFileName(immutable_cf_options.cf_paths.front().path,
blob_file_number));
const auto& blob_file_addition = blob_file_additions[i];
ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), i + 2);
ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number);
ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1);
ASSERT_EQ(blob_file_addition.GetTotalBlobBytes(),
BlobLogRecord::kHeaderSize + key_size + value_size);
@ -258,8 +275,8 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) {
expected_key_value_pairs[i]};
std::vector<std::string> blob_index{blob_indexes[i]};
VerifyBlobFile(immutable_cf_options, i + 2, column_family_id,
kNoCompression, expected_key_value_pair, blob_index);
VerifyBlobFile(i + 2, blob_file_paths[i], column_family_id, kNoCompression,
expected_key_value_pair, blob_index);
}
}
@ -286,12 +303,14 @@ TEST_F(BlobFileBuilderTest, InlinedValues) {
constexpr Env::IOPriority io_priority = Env::IO_HIGH;
constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM;
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(
TestFileNumberGenerator(), &mock_env_, &fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint, &blob_file_additions);
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
&blob_file_paths, &blob_file_additions);
for (size_t i = 0; i < number_of_blobs; ++i) {
const std::string key = std::to_string(i);
@ -308,6 +327,7 @@ TEST_F(BlobFileBuilderTest, InlinedValues) {
ASSERT_OK(builder.Finish());
// Check the metadata generated
ASSERT_TRUE(blob_file_paths.empty());
ASSERT_TRUE(blob_file_additions.empty());
}
@ -335,12 +355,14 @@ TEST_F(BlobFileBuilderTest, Compression) {
constexpr Env::IOPriority io_priority = Env::IO_HIGH;
constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM;
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(
TestFileNumberGenerator(), &mock_env_, &fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint, &blob_file_additions);
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
&blob_file_paths, &blob_file_additions);
const std::string key("1");
const std::string uncompressed_value(value_size, 'x');
@ -353,12 +375,20 @@ TEST_F(BlobFileBuilderTest, Compression) {
ASSERT_OK(builder.Finish());
// Check the metadata generated
constexpr uint64_t blob_file_number = 2;
ASSERT_EQ(blob_file_paths.size(), 1);
const std::string& blob_file_path = blob_file_paths[0];
ASSERT_EQ(blob_file_path,
BlobFileName(immutable_cf_options.cf_paths.front().path,
blob_file_number));
ASSERT_EQ(blob_file_additions.size(), 1);
const auto& blob_file_addition = blob_file_additions[0];
constexpr uint64_t blob_file_number = 2;
ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number);
ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1);
@ -381,7 +411,7 @@ TEST_F(BlobFileBuilderTest, Compression) {
{key, compressed_value}};
std::vector<std::string> blob_indexes{blob_index};
VerifyBlobFile(immutable_cf_options, blob_file_number, column_family_id,
VerifyBlobFile(blob_file_number, blob_file_path, column_family_id,
kSnappyCompression, expected_key_value_pairs, blob_indexes);
}
@ -407,12 +437,14 @@ TEST_F(BlobFileBuilderTest, CompressionError) {
constexpr Env::IOPriority io_priority = Env::IO_HIGH;
constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM;
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(
TestFileNumberGenerator(), &mock_env_, &fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint, &blob_file_additions);
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
&blob_file_paths, &blob_file_additions);
SyncPoint::GetInstance()->SetCallBack("CompressData:TamperWithReturnValue",
[](void* arg) {
@ -430,6 +462,15 @@ TEST_F(BlobFileBuilderTest, CompressionError) {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
constexpr uint64_t blob_file_number = 2;
ASSERT_EQ(blob_file_paths.size(), 1);
ASSERT_EQ(blob_file_paths[0],
BlobFileName(immutable_cf_options.cf_paths.front().path,
blob_file_number));
ASSERT_TRUE(blob_file_additions.empty());
}
TEST_F(BlobFileBuilderTest, Checksum) {
@ -473,12 +514,14 @@ TEST_F(BlobFileBuilderTest, Checksum) {
constexpr Env::IOPriority io_priority = Env::IO_HIGH;
constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM;
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(
TestFileNumberGenerator(), &mock_env_, &fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint, &blob_file_additions);
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
&blob_file_paths, &blob_file_additions);
const std::string key("1");
const std::string value("deadbeef");
@ -491,12 +534,20 @@ TEST_F(BlobFileBuilderTest, Checksum) {
ASSERT_OK(builder.Finish());
// Check the metadata generated
constexpr uint64_t blob_file_number = 2;
ASSERT_EQ(blob_file_paths.size(), 1);
const std::string& blob_file_path = blob_file_paths[0];
ASSERT_EQ(blob_file_path,
BlobFileName(immutable_cf_options.cf_paths.front().path,
blob_file_number));
ASSERT_EQ(blob_file_additions.size(), 1);
const auto& blob_file_addition = blob_file_additions[0];
constexpr uint64_t blob_file_number = 2;
ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number);
ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1);
ASSERT_EQ(blob_file_addition.GetTotalBlobBytes(),
@ -509,7 +560,7 @@ TEST_F(BlobFileBuilderTest, Checksum) {
{key, value}};
std::vector<std::string> blob_indexes{blob_index};
VerifyBlobFile(immutable_cf_options, blob_file_number, column_family_id,
VerifyBlobFile(blob_file_number, blob_file_path, column_family_id,
kNoCompression, expected_key_value_pairs, blob_indexes);
}
@ -561,13 +612,14 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) {
constexpr Env::IOPriority io_priority = Env::IO_HIGH;
constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM;
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(TestFileNumberGenerator(), &fault_injection_env_,
&fs_, &immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
&blob_file_additions);
&blob_file_paths, &blob_file_additions);
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
fault_injection_env_.SetFilesystemActive(false,
@ -584,6 +636,19 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
if (sync_point_ == "BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile") {
ASSERT_TRUE(blob_file_paths.empty());
} else {
constexpr uint64_t blob_file_number = 2;
ASSERT_EQ(blob_file_paths.size(), 1);
ASSERT_EQ(blob_file_paths[0],
BlobFileName(immutable_cf_options.cf_paths.front().path,
blob_file_number));
}
ASSERT_TRUE(blob_file_additions.empty());
}
} // namespace ROCKSDB_NAMESPACE

@ -13,6 +13,7 @@
#include <deque>
#include <vector>
#include "db/blob/blob_file_builder.h"
#include "db/compaction/compaction_iterator.h"
#include "db/dbformat.h"
#include "db/event_helpers.h"
@ -67,13 +68,14 @@ TableBuilder* NewTableBuilder(
}
Status BuildTable(
const std::string& dbname, Env* env, FileSystem* fs,
const std::string& dbname, VersionSet* versions, Env* env, FileSystem* fs,
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const FileOptions& file_options,
TableCache* table_cache, InternalIterator* iter,
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters,
FileMetaData* meta, const InternalKeyComparator& internal_comparator,
FileMetaData* meta, std::vector<BlobFileAddition>* blob_file_additions,
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
uint32_t column_family_id, const std::string& column_family_name,
@ -107,6 +109,7 @@ Status BuildTable(
std::string fname = TableFileName(ioptions.cf_paths, meta->fd.GetNumber(),
meta->fd.GetPathId());
std::vector<std::string> blob_file_paths;
std::string file_checksum = kUnknownFileChecksum;
std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
#ifndef ROCKSDB_LITE
@ -163,11 +166,22 @@ Status BuildTable(
snapshots.empty() ? 0 : snapshots.back(),
snapshot_checker);
std::unique_ptr<BlobFileBuilder> blob_file_builder(
(mutable_cf_options.enable_blob_files && blob_file_additions)
? new BlobFileBuilder(versions, env, fs, &ioptions,
&mutable_cf_options, &file_options, job_id,
column_family_id, column_family_name,
io_priority, write_hint, &blob_file_paths,
blob_file_additions)
: nullptr);
CompactionIterator c_iter(
iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber,
&snapshots, earliest_write_conflict_snapshot, snapshot_checker, env,
ShouldReportDetailedTime(env, ioptions.statistics),
true /* internal key corruption is not ok */, range_del_agg.get());
true /* internal key corruption is not ok */, range_del_agg.get(),
blob_file_builder.get());
c_iter.SeekToFirst();
for (; c_iter.Valid(); c_iter.Next()) {
const Slice& key = c_iter.key();
@ -200,9 +214,16 @@ Status BuildTable(
}
// Finish and check for builder errors
bool empty = builder->IsEmpty();
s = c_iter.status();
if (blob_file_builder) {
if (s.ok()) {
s = blob_file_builder->Finish();
}
}
TEST_SYNC_POINT("BuildTable:BeforeFinishBuildTable");
const bool empty = builder->IsEmpty();
if (!s.ok() || empty) {
builder->Abandon();
} else {
@ -290,7 +311,19 @@ Status BuildTable(
}
if (!s.ok() || meta->fd.GetFileSize() == 0) {
fs->DeleteFile(fname, IOOptions(), nullptr);
constexpr IODebugContext* dbg = nullptr;
fs->DeleteFile(fname, IOOptions(), dbg);
assert(blob_file_additions || blob_file_paths.empty());
if (blob_file_additions) {
for (const std::string& blob_file_path : blob_file_paths) {
fs->DeleteFile(blob_file_path, IOOptions(), dbg);
}
blob_file_additions->clear();
}
}
if (meta->fd.GetFileSize() == 0) {

@ -27,8 +27,10 @@ namespace ROCKSDB_NAMESPACE {
struct Options;
struct FileMetaData;
class VersionSet;
class Env;
struct EnvOptions;
class BlobFileAddition;
class Iterator;
class SnapshotChecker;
class TableCache;
@ -63,13 +65,14 @@ TableBuilder* NewTableBuilder(
// @param column_family_name Name of the column family that is also identified
// by column_family_id, or empty string if unknown.
extern Status BuildTable(
const std::string& dbname, Env* env, FileSystem* fs,
const std::string& dbname, VersionSet* versions, Env* env, FileSystem* fs,
const ImmutableCFOptions& options,
const MutableCFOptions& mutable_cf_options, const FileOptions& file_options,
TableCache* table_cache, InternalIterator* iter,
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters,
FileMetaData* meta, const InternalKeyComparator& internal_comparator,
FileMetaData* meta, std::vector<BlobFileAddition>* blob_file_additions,
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
uint32_t column_family_id, const std::string& column_family_name,

@ -3,9 +3,11 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/compaction/compaction_iterator.h"
#include <cinttypes>
#include "db/compaction/compaction_iterator.h"
#include "db/blob/blob_file_builder.h"
#include "db/snapshot_checker.h"
#include "port/likely.h"
#include "rocksdb/listener.h"
@ -36,7 +38,8 @@ CompactionIterator::CompactionIterator(
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg, const Compaction* compaction,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, const Compaction* compaction,
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum,
@ -46,6 +49,7 @@ CompactionIterator::CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots,
earliest_write_conflict_snapshot, snapshot_checker, env,
report_detailed_time, expect_valid_internal_key, range_del_agg,
blob_file_builder,
std::unique_ptr<CompactionProxy>(
compaction ? new CompactionProxy(compaction) : nullptr),
compaction_filter, shutting_down, preserve_deletes_seqnum,
@ -58,6 +62,7 @@ CompactionIterator::CompactionIterator(
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
@ -74,6 +79,7 @@ CompactionIterator::CompactionIterator(
report_detailed_time_(report_detailed_time),
expect_valid_internal_key_(expect_valid_internal_key),
range_del_agg_(range_del_agg),
blob_file_builder_(blob_file_builder),
compaction_(std::move(compaction)),
compaction_filter_(compaction_filter),
shutting_down_(shutting_down),
@ -661,20 +667,37 @@ void CompactionIterator::NextFromInput() {
void CompactionIterator::PrepareOutput() {
if (valid_) {
if (compaction_filter_ && ikey_.type == kTypeBlobIndex) {
const auto blob_decision = compaction_filter_->PrepareBlobOutput(
user_key(), value_, &compaction_filter_value_);
if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
status_ = Status::Corruption(
"Corrupted blob reference encountered during GC");
valid_ = false;
} else if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
status_ = Status::IOError("Could not relocate blob during GC");
valid_ = false;
} else if (blob_decision ==
CompactionFilter::BlobDecision::kChangeValue) {
value_ = compaction_filter_value_;
if (ikey_.type == kTypeValue) {
if (blob_file_builder_) {
blob_index_.clear();
const Status s =
blob_file_builder_->Add(user_key(), value_, &blob_index_);
if (!s.ok()) {
status_ = s;
valid_ = false;
} else if (!blob_index_.empty()) {
value_ = blob_index_;
ikey_.type = kTypeBlobIndex;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
}
}
} else if (ikey_.type == kTypeBlobIndex) {
if (compaction_filter_) {
const auto blob_decision = compaction_filter_->PrepareBlobOutput(
user_key(), value_, &compaction_filter_value_);
if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
status_ = Status::Corruption(
"Corrupted blob reference encountered during GC");
valid_ = false;
} else if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
status_ = Status::IOError("Could not relocate blob during GC");
valid_ = false;
} else if (blob_decision ==
CompactionFilter::BlobDecision::kChangeValue) {
value_ = compaction_filter_value_;
}
}
}

@ -21,6 +21,8 @@
namespace ROCKSDB_NAMESPACE {
class BlobFileBuilder;
class CompactionIterator {
public:
// A wrapper around Compaction. Has a much smaller interface, only what
@ -66,6 +68,7 @@ class CompactionIterator {
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder,
const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
@ -81,6 +84,7 @@ class CompactionIterator {
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
@ -163,6 +167,7 @@ class CompactionIterator {
bool report_detailed_time_;
bool expect_valid_internal_key_;
CompactionRangeDelAggregator* range_del_agg_;
BlobFileBuilder* blob_file_builder_;
std::unique_ptr<CompactionProxy> compaction_;
const CompactionFilter* compaction_filter_;
const std::atomic<bool>* shutting_down_;
@ -211,6 +216,7 @@ class CompactionIterator {
// PinnedIteratorsManager used to pin input_ Iterator blocks while reading
// merge operands and then releasing them after consuming them.
PinnedIteratorsManager pinned_iters_mgr_;
std::string blob_index_;
std::string compaction_filter_value_;
InternalKey compaction_filter_skip_until_;
// "level_ptrs" holds indices that remember which file of an associated

@ -258,7 +258,8 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
earliest_write_conflict_snapshot, snapshot_checker_.get(),
Env::Default(), false /* report_detailed_time */, false,
range_del_agg_.get(), std::move(compaction), filter, &shutting_down_));
range_del_agg_.get(), nullptr /* blob_file_builder */,
std::move(compaction), filter, &shutting_down_));
}
void AddSnapshot(SequenceNumber snapshot,

@ -914,9 +914,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
&existing_snapshots_, earliest_write_conflict_snapshot_,
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
/*expect_valid_internal_key=*/true, &range_del_agg,
sub_compact->compaction, compaction_filter, shutting_down_,
preserve_deletes_seqnum_, manual_compaction_paused_,
db_options_.info_log));
/* blob_file_builder */ nullptr, sub_compact->compaction,
compaction_filter, shutting_down_, preserve_deletes_seqnum_,
manual_compaction_paused_, db_options_.info_log));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {

@ -11,6 +11,7 @@
#include "db/db_impl/db_impl.h"
#include "db/db_test_util.h"
#include "file/filename.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "test_util/sync_point.h"
@ -443,6 +444,169 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
}
#endif // !ROCKSDB_LITE
TEST_F(DBFlushTest, FlushWithBlob) {
constexpr uint64_t min_blob_size = 10;
Options options;
options.enable_blob_files = true;
options.min_blob_size = min_blob_size;
options.disable_auto_compactions = true;
Reopen(options);
constexpr char short_value[] = "short";
static_assert(sizeof(short_value) - 1 < min_blob_size,
"short_value too long");
constexpr char long_value[] = "long_value";
static_assert(sizeof(long_value) - 1 >= min_blob_size,
"long_value too short");
ASSERT_OK(Put("key1", short_value));
ASSERT_OK(Put("key2", long_value));
ASSERT_OK(Flush());
ASSERT_EQ(Get("key1"), short_value);
// TODO: enable once Get support is implemented for blobs
// ASSERT_EQ(Get("key2"), long_value);
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
Version* const current = cfd->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
const auto& l0_files = storage_info->LevelFiles(0);
ASSERT_EQ(l0_files.size(), 1);
const FileMetaData* const table_file = l0_files[0];
assert(table_file);
const auto& blob_files = storage_info->GetBlobFiles();
ASSERT_EQ(blob_files.size(), 1);
const auto& blob_file = blob_files.begin()->second;
assert(blob_file);
ASSERT_EQ(table_file->smallest.user_key(), "key1");
ASSERT_EQ(table_file->largest.user_key(), "key2");
ASSERT_EQ(table_file->fd.smallest_seqno, 1);
ASSERT_EQ(table_file->fd.largest_seqno, 2);
ASSERT_EQ(table_file->oldest_blob_file_number,
blob_file->GetBlobFileNumber());
ASSERT_EQ(blob_file->GetTotalBlobCount(), 1);
#ifndef ROCKSDB_LITE
const InternalStats* const internal_stats = cfd->internal_stats();
assert(internal_stats);
const uint64_t expected_bytes =
table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes();
const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
ASSERT_FALSE(compaction_stats.empty());
ASSERT_EQ(compaction_stats[0].bytes_written, expected_bytes);
ASSERT_EQ(compaction_stats[0].num_output_files, 2);
const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], expected_bytes);
#endif // ROCKSDB_LITE
}
class DBFlushTestBlobError : public DBFlushTest,
public testing::WithParamInterface<std::string> {
public:
DBFlushTestBlobError() : fault_injection_env_(env_) {}
~DBFlushTestBlobError() { Close(); }
FaultInjectionTestEnv fault_injection_env_;
};
INSTANTIATE_TEST_CASE_P(DBFlushTestBlobError, DBFlushTestBlobError,
::testing::ValuesIn(std::vector<std::string>{
"BlobFileBuilder::WriteBlobToFile:AddRecord",
"BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
TEST_P(DBFlushTestBlobError, FlushError) {
Options options;
options.enable_blob_files = true;
options.disable_auto_compactions = true;
options.env = &fault_injection_env_;
Reopen(options);
ASSERT_OK(Put("key", "blob"));
SyncPoint::GetInstance()->SetCallBack(GetParam(), [this](void* /* arg */) {
fault_injection_env_.SetFilesystemActive(false, Status::IOError());
});
SyncPoint::GetInstance()->SetCallBack(
"BuildTable:BeforeFinishBuildTable", [this](void* /* arg */) {
fault_injection_env_.SetFilesystemActive(true);
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_NOK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
Version* const current = cfd->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
const auto& l0_files = storage_info->LevelFiles(0);
ASSERT_TRUE(l0_files.empty());
const auto& blob_files = storage_info->GetBlobFiles();
ASSERT_TRUE(blob_files.empty());
// Make sure the files generated by the failed job have been deleted
std::vector<std::string> files;
ASSERT_OK(env_->GetChildren(dbname_, &files));
for (const auto& file : files) {
uint64_t number = 0;
FileType type = kTableFile;
if (!ParseFileName(file, &number, &type)) {
continue;
}
ASSERT_NE(type, kTableFile);
ASSERT_NE(type, kBlobFile);
}
#ifndef ROCKSDB_LITE
const InternalStats* const internal_stats = cfd->internal_stats();
assert(internal_stats);
const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
ASSERT_FALSE(compaction_stats.empty());
ASSERT_EQ(compaction_stats[0].bytes_written, 0);
ASSERT_EQ(compaction_stats[0].num_output_files, 0);
const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], 0);
#endif // ROCKSDB_LITE
}
TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
Options options = CurrentOptions();
options.create_if_missing = true;

@ -142,6 +142,7 @@ Status DBImpl::FlushMemTableToOutputFile(
SnapshotChecker* snapshot_checker, LogBuffer* log_buffer,
Env::Priority thread_pri) {
mutex_.AssertHeld();
assert(cfd);
assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending());
@ -203,10 +204,28 @@ Status DBImpl::FlushMemTableToOutputFile(
if (made_progress) {
*made_progress = true;
}
const std::string& column_family_name = cfd->GetName();
Version* const current = cfd->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
cfd->GetName().c_str(),
cfd->current()->storage_info()->LevelSummary(&tmp));
column_family_name.c_str(),
storage_info->LevelSummary(&tmp));
const auto& blob_files = storage_info->GetBlobFiles();
if (!blob_files.empty()) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64
"\n",
column_family_name.c_str(), blob_files.begin()->first,
blob_files.rbegin()->first);
}
}
if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
@ -544,16 +563,36 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
assert(num_cfs ==
static_cast<int>(job_context->superversion_contexts.size()));
for (int i = 0; i != num_cfs; ++i) {
assert(cfds[i]);
if (cfds[i]->IsDropped()) {
continue;
}
InstallSuperVersionAndScheduleWork(cfds[i],
&job_context->superversion_contexts[i],
all_mutable_cf_options[i]);
const std::string& column_family_name = cfds[i]->GetName();
Version* const current = cfds[i]->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
cfds[i]->GetName().c_str(),
cfds[i]->current()->storage_info()->LevelSummary(&tmp));
column_family_name.c_str(),
storage_info->LevelSummary(&tmp));
const auto& blob_files = storage_info->GetBlobFiles();
if (!blob_files.empty()) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Blob file summary: head=%" PRIu64
", tail=%" PRIu64 "\n",
column_family_name.c_str(), blob_files.begin()->first,
blob_files.rbegin()->first);
}
}
if (made_progress) {
*made_progress = true;

@ -1272,7 +1272,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
MemTable* mem, VersionEdit* edit) {
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
@ -1319,11 +1321,13 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
if (range_del_iter != nullptr) {
range_del_iters.emplace_back(range_del_iter);
}
IOStatus io_s;
s = BuildTable(
dbname_, env_, fs_.get(), *cfd->ioptions(), mutable_cf_options,
file_options_for_compaction_, cfd->table_cache(), iter.get(),
std::move(range_del_iters), &meta, cfd->internal_comparator(),
dbname_, versions_.get(), env_, fs_.get(), *cfd->ioptions(),
mutable_cf_options, file_options_for_compaction_, cfd->table_cache(),
iter.get(), std::move(range_del_iters), &meta,
nullptr /* blob_file_additions */, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),

@ -266,6 +266,13 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
stream << vstorage->NumLevelFiles(level);
}
stream.EndArray();
const auto& blob_files = vstorage->GetBlobFiles();
if (!blob_files.empty()) {
stream << "blob_file_head" << blob_files.begin()->first;
stream << "blob_file_tail" << blob_files.rbegin()->first;
}
stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
if (measure_io_stats_) {
@ -300,6 +307,9 @@ Status FlushJob::WriteLevel0Table() {
const uint64_t start_micros = db_options_.env->NowMicros();
const uint64_t start_cpu_micros = db_options_.env->NowCPUNanos() / 1000;
Status s;
std::vector<BlobFileAddition> blob_file_additions;
{
auto write_hint = cfd_->CalculateSSTWriteHint(0);
db_mutex_->Unlock();
@ -388,9 +398,10 @@ Status FlushJob::WriteLevel0Table() {
IOStatus io_s;
s = BuildTable(
dbname_, db_options_.env, db_options_.fs.get(), *cfd_->ioptions(),
mutable_cf_options_, file_options_, cfd_->table_cache(), iter.get(),
std::move(range_del_iters), &meta_, cfd_->internal_comparator(),
dbname_, versions_, db_options_.env, db_options_.fs.get(),
*cfd_->ioptions(), mutable_cf_options_, file_options_,
cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_,
&blob_file_additions, cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
cfd_->GetName(), existing_snapshots_,
earliest_write_conflict_snapshot_, snapshot_checker_,
@ -425,7 +436,10 @@ Status FlushJob::WriteLevel0Table() {
// Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest.
if (s.ok() && meta_.fd.GetFileSize() > 0) {
const bool has_output = meta_.fd.GetFileSize() > 0;
assert(has_output || blob_file_additions.empty());
if (s.ok() && has_output) {
// if we have more than 1 background thread, then we cannot
// insert files directly into higher levels because some other
// threads could be concurrently producing compacted files for
@ -437,6 +451,8 @@ Status FlushJob::WriteLevel0Table() {
meta_.marked_for_compaction, meta_.oldest_blob_file_number,
meta_.oldest_ancester_time, meta_.file_creation_time,
meta_.file_checksum, meta_.file_checksum_func_name);
edit_->SetBlobFileAdditions(std::move(blob_file_additions));
}
#ifndef ROCKSDB_LITE
// Piggyback FlushJobInfo on the first first flushed memtable.
@ -447,11 +463,22 @@ Status FlushJob::WriteLevel0Table() {
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
stats.micros = db_options_.env->NowMicros() - start_micros;
stats.cpu_micros = db_options_.env->NowCPUNanos() / 1000 - start_cpu_micros;
stats.bytes_written = meta_.fd.GetFileSize();
if (has_output) {
stats.bytes_written = meta_.fd.GetFileSize();
const auto& blobs = edit_->GetBlobFileAdditions();
for (const auto& blob : blobs) {
stats.bytes_written += blob.GetTotalBlobBytes();
}
stats.num_output_files = static_cast<int>(blobs.size()) + 1;
}
RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
meta_.fd.GetFileSize());
stats.bytes_written);
RecordFlushIOStats();
return s;
}

@ -392,6 +392,8 @@ class InternalStats {
bool GetIntPropertyOutOfMutex(const DBPropertyInfo& property_info,
Version* version, uint64_t* value);
const uint64_t* TEST_GetCFStatsValue() const { return cf_stats_value_; }
const std::vector<CompactionStats>& TEST_GetCompactionStats() const {
return comp_stats_;
}

@ -445,9 +445,18 @@ Status MemTableList::TryInstallMemtableFlushResults(
}
if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
batch_file_number = m->file_number_;
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64 " started",
cfd->GetName().c_str(), m->file_number_);
if (m->edit_.GetBlobFileAdditions().empty()) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64 " started",
cfd->GetName().c_str(), m->file_number_);
} else {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64
" (+%zu blob files) started",
cfd->GetName().c_str(), m->file_number_,
m->edit_.GetBlobFileAdditions().size());
}
edit_list.push_back(&m->edit_);
memtables_to_flush.push_back(m);
#ifndef ROCKSDB_LITE
@ -502,9 +511,20 @@ Status MemTableList::TryInstallMemtableFlushResults(
if (s.ok() && !cfd->IsDropped()) { // commit new state
while (batch_count-- > 0) {
MemTable* m = current_->memlist_.back();
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_, mem_id);
if (m->edit_.GetBlobFileAdditions().empty()) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_, mem_id);
} else {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64
" (+%zu blob files)"
": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_,
m->edit_.GetBlobFileAdditions().size(), mem_id);
}
assert(m->file_number_ > 0);
current_->Remove(m, to_delete);
UpdateCachedValuesFromMemTableListVersion();
@ -515,9 +535,20 @@ Status MemTableList::TryInstallMemtableFlushResults(
for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) {
MemTable* m = *it;
// commit failed. setup state so that we can flush again.
ROCKS_LOG_BUFFER(log_buffer, "Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed",
m->file_number_, mem_id);
if (m->edit_.GetBlobFileAdditions().empty()) {
ROCKS_LOG_BUFFER(log_buffer,
"Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed",
m->file_number_, mem_id);
} else {
ROCKS_LOG_BUFFER(log_buffer,
"Level-0 commit table #%" PRIu64
" (+%zu blob files)"
": memtable #%" PRIu64 " failed",
m->file_number_,
m->edit_.GetBlobFileAdditions().size(), mem_id);
}
m->flush_completed_ = false;
m->flush_in_progress_ = false;
m->edit_.Clear();
@ -713,11 +744,25 @@ Status InstallMemtableAtomicFlushResults(
for (auto m : *mems_list[i]) {
assert(m->GetFileNumber() > 0);
uint64_t mem_id = m->GetID();
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
cfds[i]->GetName().c_str(), m->GetFileNumber(),
mem_id);
const VersionEdit* const edit = m->GetEdits();
assert(edit);
if (edit->GetBlobFileAdditions().empty()) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
cfds[i]->GetName().c_str(), m->GetFileNumber(),
mem_id);
} else {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64
" (+%zu blob files)"
": memtable #%" PRIu64 " done",
cfds[i]->GetName().c_str(), m->GetFileNumber(),
edit->GetBlobFileAdditions().size(), mem_id);
}
imm->current_->Remove(m, to_delete);
imm->UpdateCachedValuesFromMemTableListVersion();
imm->ResetTrimHistoryNeeded();
@ -728,11 +773,25 @@ Status InstallMemtableAtomicFlushResults(
auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i);
for (auto m : *mems_list[i]) {
uint64_t mem_id = m->GetID();
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed",
cfds[i]->GetName().c_str(), m->GetFileNumber(),
mem_id);
const VersionEdit* const edit = m->GetEdits();
assert(edit);
if (edit->GetBlobFileAdditions().empty()) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed",
cfds[i]->GetName().c_str(), m->GetFileNumber(),
mem_id);
} else {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64
" (+%zu blob files)"
": memtable #%" PRIu64 " failed",
cfds[i]->GetName().c_str(), m->GetFileNumber(),
edit->GetBlobFileAdditions().size(), mem_id);
}
m->SetFlushCompleted(false);
m->SetFlushInProgress(false);
m->GetEdits()->Clear();

@ -429,18 +429,19 @@ class Repairer {
LegacyFileSystemWrapper fs(env_);
IOStatus io_s;
status = BuildTable(
dbname_, env_, &fs, *cfd->ioptions(),
dbname_, /* versions */ nullptr, env_, &fs, *cfd->ioptions(),
*cfd->GetLatestMutableCFOptions(), env_options_, table_cache_,
iter.get(), std::move(range_del_iters), &meta,
cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber,
snapshot_checker, kNoCompression, 0 /* sample_for_compression */,
CompressionOptions(), false, nullptr /* internal_stats */,
TableFileCreationReason::kRecovery, &io_s, nullptr /*IOTracer*/,
nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH,
nullptr /* table_properties */, -1 /* level */, current_time,
0 /* oldest_key_time */, write_hint, 0 /* file_creation_time */,
"DB Repairer" /* db_id */, db_session_id_);
nullptr /* blob_file_additions */, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
{}, kMaxSequenceNumber, snapshot_checker, kNoCompression,
0 /* sample_for_compression */, CompressionOptions(), false,
nullptr /* internal_stats */, TableFileCreationReason::kRecovery,
&io_s, nullptr /*IOTracer*/, nullptr /* event_logger */,
0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */,
-1 /* level */, current_time, 0 /* oldest_key_time */, write_hint,
0 /* file_creation_time */, "DB Repairer" /* db_id */,
db_session_id_);
ROCKS_LOG_INFO(db_options_.info_log,
"Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
log, counter, meta.fd.GetNumber(),

@ -543,7 +543,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
return s;
}
blob_file_additions_.emplace_back(blob_file_addition);
AddBlobFile(std::move(blob_file_addition));
break;
}
@ -554,7 +554,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
return s;
}
blob_file_garbages_.emplace_back(blob_file_garbage);
AddBlobFileGarbage(std::move(blob_file_garbage));
break;
}

@ -414,12 +414,20 @@ class VersionEdit {
std::move(checksum_method), std::move(checksum_value));
}
void AddBlobFile(BlobFileAddition blob_file_addition) {
blob_file_additions_.emplace_back(std::move(blob_file_addition));
}
// Retrieve all the blob files added.
using BlobFileAdditions = std::vector<BlobFileAddition>;
const BlobFileAdditions& GetBlobFileAdditions() const {
return blob_file_additions_;
}
void SetBlobFileAdditions(BlobFileAdditions blob_file_additions) {
blob_file_additions_ = std::move(blob_file_additions);
}
// Add garbage for an existing blob file. Note: intentionally broken English
// follows.
void AddBlobFileGarbage(uint64_t blob_file_number,
@ -429,12 +437,20 @@ class VersionEdit {
garbage_blob_bytes);
}
void AddBlobFileGarbage(BlobFileGarbage blob_file_garbage) {
blob_file_garbages_.emplace_back(std::move(blob_file_garbage));
}
// Retrieve all the blob file garbage added.
using BlobFileGarbages = std::vector<BlobFileGarbage>;
const BlobFileGarbages& GetBlobFileGarbages() const {
return blob_file_garbages_;
}
void SetBlobFileGarbages(BlobFileGarbages blob_file_garbages) {
blob_file_garbages_ = std::move(blob_file_garbages);
}
// Add a WAL (either just created or closed).
void AddWal(WalNumber number, WalMetadata metadata = WalMetadata()) {
wal_additions_.emplace_back(number, std::move(metadata));

Loading…
Cancel
Save