Integrate BlobFileBuilder into the compaction process (#7573)

Summary:
Similarly to how https://github.com/facebook/rocksdb/issues/7345
integrated blob file writing into the flush process,
the patch adds support for writing blob files to the compaction logic.
Namely, if `enable_blob_files` is set, large values encountered during
compaction are extracted to blob files and replaced with blob indexes.
The resulting blob files are then logged to the MANIFEST as part of the
compaction job's `VersionEdit` and added to the `Version` alongside any
table files written by the compaction. Any errors during blob file building fail
the compaction job.

There will be a separate follow-up patch to perform blob garbage collection
during compactions.

In addition, the patch continues to chip away at the mess around computing
various compaction related statistics by eliminating some code duplication
and by making the `num_output_files` and `bytes_written` stats more consistent
for flushes, compactions, and recovery.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7573

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D24404696

Pulled By: ltamasi

fbshipit-source-id: 21216af3a172ad3ce8f85d11cd30923784ae426c
main
Levi Tamasi 4 years ago committed by Facebook GitHub Bot
parent 25d54c799c
commit a7a04b6898
  1. 17
      db/builder.cc
  2. 202
      db/compaction/compaction_job.cc
  3. 5
      db/compaction/compaction_job.h
  4. 2
      db/compaction/compaction_job_test.cc
  5. 175
      db/db_compaction_test.cc
  6. 26
      db/db_flush_test.cc
  7. 16
      db/db_impl/db_impl_compaction_flush.cc
  8. 14
      db/db_impl/db_impl_open.cc
  9. 12
      db/db_wal_test.cc
  10. 14
      db/flush_job.cc

@ -212,6 +212,7 @@ Status BuildTable(
} else if (!c_iter.status().ok()) {
s = c_iter.status();
}
if (s.ok()) {
auto range_del_it = range_del_agg->NewIterator();
for (range_del_it->SeekToFirst(); range_del_it->Valid();
@ -222,10 +223,6 @@ Status BuildTable(
meta->UpdateBoundariesForRange(kv.first, tombstone.SerializeEndKey(),
tombstone.seq_, internal_comparator);
}
if (blob_file_builder) {
s = blob_file_builder->Finish();
}
}
TEST_SYNC_POINT("BuildTable:BeforeFinishBuildTable");
@ -273,6 +270,14 @@ Status BuildTable(
s = *io_status;
}
if (blob_file_builder) {
if (s.ok()) {
s = blob_file_builder->Finish();
}
blob_file_builder.reset();
}
// TODO Also check the IO status when create the Iterator.
if (s.ok() && !empty) {
@ -318,6 +323,8 @@ Status BuildTable(
}
if (!s.ok() || meta->fd.GetFileSize() == 0) {
TEST_SYNC_POINT("BuildTable:BeforeDeleteFile");
constexpr IODebugContext* dbg = nullptr;
Status ignored = fs->DeleteFile(fname, IOOptions(), dbg);
@ -330,8 +337,6 @@ Status BuildTable(
ignored = fs->DeleteFile(blob_file_path, IOOptions(), dbg);
ignored.PermitUncheckedError();
}
blob_file_additions->clear();
}
}

@ -20,6 +20,8 @@
#include <utility>
#include <vector>
#include "db/blob/blob_file_addition.h"
#include "db/blob/blob_file_builder.h"
#include "db/builder.h"
#include "db/db_impl/db_impl.h"
#include "db/db_iter.h"
@ -138,6 +140,7 @@ struct CompactionJob::SubcompactionState {
// State kept for output being generated
std::vector<Output> outputs;
std::vector<BlobFileAddition> blob_file_additions;
std::unique_ptr<WritableFileWriter> outfile;
std::unique_ptr<TableBuilder> builder;
@ -231,21 +234,13 @@ struct CompactionJob::CompactionState {
std::vector<CompactionJob::SubcompactionState> sub_compact_states;
Status status;
uint64_t total_bytes;
uint64_t num_output_records;
explicit CompactionState(Compaction* c)
: compaction(c),
total_bytes(0),
num_output_records(0) {}
size_t num_output_files = 0;
uint64_t total_bytes = 0;
size_t num_blob_output_files = 0;
uint64_t total_blob_bytes = 0;
uint64_t num_output_records = 0;
size_t NumOutputFiles() {
size_t total = 0;
for (auto& s : sub_compact_states) {
total += s.outputs.size();
}
return total;
}
explicit CompactionState(Compaction* c) : compaction(c) {}
Slice SmallestUserKey() {
for (const auto& sub_compact_state : sub_compact_states) {
@ -272,11 +267,29 @@ struct CompactionJob::CompactionState {
};
void CompactionJob::AggregateStatistics() {
assert(compact_);
for (SubcompactionState& sc : compact_->sub_compact_states) {
auto& outputs = sc.outputs;
if (!outputs.empty() && !outputs.back().meta.fd.file_size) {
// An error occurred, so ignore the last output.
outputs.pop_back();
}
compact_->num_output_files += outputs.size();
compact_->total_bytes += sc.total_bytes;
const auto& blobs = sc.blob_file_additions;
compact_->num_blob_output_files += blobs.size();
for (const auto& blob : blobs) {
compact_->total_blob_bytes += blob.GetTotalBlobBytes();
}
compact_->num_output_records += sc.num_output_records;
}
for (SubcompactionState& sc : compact_->sub_compact_states) {
compaction_job_stats_->Add(sc.compaction_job_stats);
}
}
@ -286,7 +299,8 @@ CompactionJob::CompactionJob(
const FileOptions& file_options, VersionSet* versions,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
FSDirectory* db_directory, FSDirectory* output_directory, Statistics* stats,
FSDirectory* db_directory, FSDirectory* output_directory,
FSDirectory* blob_output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
@ -317,6 +331,7 @@ CompactionJob::CompactionJob(
log_buffer_(log_buffer),
db_directory_(db_directory),
output_directory_(output_directory),
blob_output_directory_(blob_output_directory),
stats_(stats),
db_mutex_(db_mutex),
db_error_handler_(db_error_handler),
@ -604,18 +619,34 @@ Status CompactionJob::Run() {
// Check if any thread encountered an error during execution
Status status;
IOStatus io_s;
bool wrote_new_blob_files = false;
for (const auto& state : compact_->sub_compact_states) {
if (!state.status.ok()) {
status = state.status;
io_s = state.io_status;
break;
}
if (!state.blob_file_additions.empty()) {
wrote_new_blob_files = true;
}
}
if (io_status_.ok()) {
io_status_ = io_s;
}
if (status.ok() && output_directory_) {
io_s = output_directory_->Fsync(IOOptions(), nullptr);
if (status.ok()) {
constexpr IODebugContext* dbg = nullptr;
if (output_directory_) {
io_s = output_directory_->Fsync(IOOptions(), dbg);
}
if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ &&
blob_output_directory_ != output_directory_) {
io_s = blob_output_directory_->Fsync(IOOptions(), dbg);
}
}
if (io_status_.ok()) {
io_status_ = io_s;
@ -721,6 +752,7 @@ Status CompactionJob::Run() {
// Finish up all book-keeping to unify the subcompaction results
AggregateStatistics();
UpdateCompactionStats();
RecordCompactionIOStats();
LogFlush(db_options_.info_log);
TEST_SYNC_POINT("CompactionJob::Run():End");
@ -730,11 +762,16 @@ Status CompactionJob::Run() {
}
Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
assert(compact_);
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_INSTALL);
db_mutex_->AssertHeld();
Status status = compact_->status;
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
assert(cfd);
cfd->internal_stats()->AddCompactionStats(
compact_->compaction->output_level(), thread_pri_, compaction_stats_);
@ -744,6 +781,7 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
if (!versions_->io_status().ok()) {
io_status_ = versions_->io_status();
}
VersionStorageInfo::LevelSummaryStorage tmp;
auto vstorage = cfd->current()->storage_info();
const auto& stats = compaction_stats_;
@ -768,6 +806,8 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
stats.bytes_written / static_cast<double>(stats.micros);
}
const std::string& column_family_name = cfd->GetName();
ROCKS_LOG_BUFFER(
log_buffer_,
"[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
@ -775,8 +815,9 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
"write-amplify(%.1f) %s, records in: %" PRIu64
", records dropped: %" PRIu64 " output_compression: %s\n",
cfd->GetName().c_str(), vstorage->LevelSummary(&tmp), bytes_read_per_sec,
bytes_written_per_sec, compact_->compaction->output_level(),
column_family_name.c_str(), vstorage->LevelSummary(&tmp),
bytes_read_per_sec, bytes_written_per_sec,
compact_->compaction->output_level(),
stats.num_input_files_in_non_output_levels,
stats.num_input_files_in_output_level, stats.num_output_files,
stats.bytes_read_non_output_levels / 1048576.0,
@ -787,6 +828,15 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
CompressionTypeToString(compact_->compaction->output_compression())
.c_str());
const auto& blob_files = vstorage->GetBlobFiles();
if (!blob_files.empty()) {
ROCKS_LOG_BUFFER(log_buffer_,
"[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64
"\n",
column_family_name.c_str(), blob_files.begin()->first,
blob_files.rbegin()->first);
}
UpdateCompactionJobStats(stats);
auto stream = event_logger_->LogToBuffer(log_buffer_);
@ -795,11 +845,18 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
<< "compaction_time_micros" << stats.micros
<< "compaction_time_cpu_micros" << stats.cpu_micros << "output_level"
<< compact_->compaction->output_level() << "num_output_files"
<< compact_->NumOutputFiles() << "total_output_size"
<< compact_->total_bytes << "num_input_records"
<< stats.num_input_records << "num_output_records"
<< compact_->num_output_records << "num_subcompactions"
<< compact_->sub_compact_states.size() << "output_compression"
<< compact_->num_output_files << "total_output_size"
<< compact_->total_bytes;
if (compact_->num_blob_output_files > 0) {
stream << "num_blob_output_files" << compact_->num_blob_output_files
<< "total_blob_output_size" << compact_->total_blob_bytes;
}
stream << "num_input_records" << stats.num_input_records
<< "num_output_records" << compact_->num_output_records
<< "num_subcompactions" << compact_->sub_compact_states.size()
<< "output_compression"
<< CompressionTypeToString(compact_->compaction->output_compression());
stream << "num_single_delete_mismatches"
@ -823,12 +880,18 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
}
stream.EndArray();
if (!blob_files.empty()) {
stream << "blob_file_head" << blob_files.begin()->first;
stream << "blob_file_tail" << blob_files.rbegin()->first;
}
CleanupCompaction();
return status;
}
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
assert(sub_compact != nullptr);
assert(sub_compact);
assert(sub_compact->compaction);
uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000;
@ -899,6 +962,22 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
snapshot_checker_, compact_->compaction->level(),
db_options_.statistics.get());
const MutableCFOptions* mutable_cf_options =
sub_compact->compaction->mutable_cf_options();
assert(mutable_cf_options);
std::vector<std::string> blob_file_paths;
std::unique_ptr<BlobFileBuilder> blob_file_builder(
mutable_cf_options->enable_blob_files
? new BlobFileBuilder(
versions_, env_, fs_.get(),
sub_compact->compaction->immutable_cf_options(),
mutable_cf_options, &file_options_, job_id_, cfd->GetID(),
cfd->GetName(), Env::IOPriority::IO_LOW, write_hint_,
&blob_file_paths, &sub_compact->blob_file_additions)
: nullptr);
TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::Run():PausingManualCompaction:1",
@ -921,7 +1000,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
&existing_snapshots_, earliest_write_conflict_snapshot_,
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
/*expect_valid_internal_key=*/true, &range_del_agg,
/* blob_file_builder */ nullptr, db_options_.allow_data_in_errors,
blob_file_builder.get(), db_options_.allow_data_in_errors,
sub_compact->compaction, compaction_filter, shutting_down_,
preserve_deletes_seqnum_, manual_compaction_paused_,
db_options_.info_log));
@ -1093,6 +1172,14 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
}
if (blob_file_builder) {
if (status.ok()) {
status = blob_file_builder->Finish();
}
blob_file_builder.reset();
}
sub_compact->compaction_job_stats.cpu_micros =
env_->NowCPUNanos() / 1000 - prev_cpu_micros;
@ -1479,9 +1566,13 @@ Status CompactionJob::FinishCompactionOutputFile(
Status CompactionJob::InstallCompactionResults(
const MutableCFOptions& mutable_cf_options) {
assert(compact_);
db_mutex_->AssertHeld();
auto* compaction = compact_->compaction;
assert(compaction);
// paranoia: verify that the files that we started with
// still exist in the current version and in the same original level.
// This ensures that a concurrent compaction did not erroneously
@ -1497,23 +1588,32 @@ Status CompactionJob::InstallCompactionResults(
{
Compaction::InputLevelSummaryBuffer inputs_summary;
ROCKS_LOG_INFO(
db_options_.info_log, "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
compaction->column_family_data()->GetName().c_str(), job_id_,
compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes);
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
compaction->column_family_data()->GetName().c_str(), job_id_,
compaction->InputLevelSummary(&inputs_summary),
compact_->total_bytes + compact_->total_blob_bytes);
}
VersionEdit* const edit = compaction->edit();
assert(edit);
// Add compaction inputs
compaction->AddInputDeletions(compact_->compaction->edit());
compaction->AddInputDeletions(edit);
for (const auto& sub_compact : compact_->sub_compact_states) {
for (const auto& out : sub_compact.outputs) {
compaction->edit()->AddFile(compaction->output_level(), out.meta);
edit->AddFile(compaction->output_level(), out.meta);
}
for (const auto& blob : sub_compact.blob_file_additions) {
edit->AddBlobFile(blob);
}
}
return versions_->LogAndApply(compaction->column_family_data(),
mutable_cf_options, compaction->edit(),
db_mutex_, db_directory_);
mutable_cf_options, edit, db_mutex_,
db_directory_);
}
void CompactionJob::RecordCompactionIOStats() {
@ -1689,6 +1789,8 @@ void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
#endif // !ROCKSDB_LITE
void CompactionJob::UpdateCompactionStats() {
assert(compact_);
Compaction* compaction = compact_->compaction;
compaction_stats_.num_input_files_in_non_output_levels = 0;
compaction_stats_.num_input_files_in_output_level = 0;
@ -1706,27 +1808,15 @@ void CompactionJob::UpdateCompactionStats() {
}
}
uint64_t num_output_records = 0;
for (const auto& sub_compact : compact_->sub_compact_states) {
size_t num_output_files = sub_compact.outputs.size();
if (sub_compact.builder != nullptr) {
// An error occurred so ignore the last output.
assert(num_output_files > 0);
--num_output_files;
}
compaction_stats_.num_output_files += static_cast<int>(num_output_files);
num_output_records += sub_compact.num_output_records;
for (const auto& out : sub_compact.outputs) {
compaction_stats_.bytes_written += out.meta.fd.file_size;
}
}
compaction_stats_.num_output_files =
static_cast<int>(compact_->num_output_files) +
static_cast<int>(compact_->num_blob_output_files);
compaction_stats_.bytes_written =
compact_->total_bytes + compact_->total_blob_bytes;
if (compaction_stats_.num_input_records > num_output_records) {
if (compaction_stats_.num_input_records > compact_->num_output_records) {
compaction_stats_.num_dropped_records =
compaction_stats_.num_input_records - num_output_records;
compaction_stats_.num_input_records - compact_->num_output_records;
}
}
@ -1765,7 +1855,7 @@ void CompactionJob::UpdateCompactionJobStats(
compaction_job_stats_->num_output_records = compact_->num_output_records;
compaction_job_stats_->num_output_files = stats.num_output_files;
if (compact_->NumOutputFiles() > 0U) {
if (stats.num_output_files > 0) {
CopyPrefix(compact_->SmallestUserKey(),
CompactionJobStats::kMaxPrefixLength,
&compaction_job_stats_->smallest_output_key_prefix);

@ -68,8 +68,8 @@ class CompactionJob {
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
FSDirectory* db_directory, FSDirectory* output_directory,
Statistics* stats, InstrumentedMutex* db_mutex,
ErrorHandler* db_error_handler,
FSDirectory* blob_output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker,
@ -169,6 +169,7 @@ class CompactionJob {
LogBuffer* log_buffer_;
FSDirectory* db_directory_;
FSDirectory* output_directory_;
FSDirectory* blob_output_directory_;
Statistics* stats_;
InstrumentedMutex* db_mutex_;
ErrorHandler* db_error_handler_;

@ -340,7 +340,7 @@ class CompactionJobTest : public testing::Test {
CompactionJob compaction_job(
0, &compaction, db_options_, env_options_, versions_.get(),
&shutting_down_, preserve_deletes_seqnum_, &log_buffer, nullptr,
nullptr, nullptr, &mutex_, &error_handler_, snapshots,
nullptr, nullptr, nullptr, &mutex_, &error_handler_, snapshots,
earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
&event_logger, false, false, dbname_, &compaction_job_stats_,
Env::Priority::USER, nullptr /* IOTracer */);

@ -5845,6 +5845,181 @@ TEST_F(DBCompactionTest, ChangeLevelErrorPathTest) {
ASSERT_EQ("0,5", FilesPerLevel(0));
}
TEST_F(DBCompactionTest, CompactionWithBlob) {
Options options;
options.env = env_;
options.disable_auto_compactions = true;
Reopen(options);
constexpr char first_key[] = "first_key";
constexpr char second_key[] = "second_key";
constexpr char first_value[] = "first_value";
constexpr char second_value[] = "second_value";
constexpr char third_value[] = "third_value";
ASSERT_OK(Put(first_key, first_value));
ASSERT_OK(Put(second_key, first_value));
ASSERT_OK(Flush());
ASSERT_OK(Put(first_key, second_value));
ASSERT_OK(Put(second_key, second_value));
ASSERT_OK(Flush());
ASSERT_OK(Put(first_key, third_value));
ASSERT_OK(Put(second_key, third_value));
ASSERT_OK(Flush());
options.enable_blob_files = true;
Reopen(options);
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
ASSERT_EQ(Get(first_key), third_value);
ASSERT_EQ(Get(second_key), third_value);
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
Version* const current = cfd->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
const auto& l1_files = storage_info->LevelFiles(1);
ASSERT_EQ(l1_files.size(), 1);
const FileMetaData* const table_file = l1_files[0];
assert(table_file);
const auto& blob_files = storage_info->GetBlobFiles();
ASSERT_EQ(blob_files.size(), 1);
const auto& blob_file = blob_files.begin()->second;
assert(blob_file);
ASSERT_EQ(table_file->smallest.user_key(), first_key);
ASSERT_EQ(table_file->largest.user_key(), second_key);
ASSERT_EQ(table_file->oldest_blob_file_number,
blob_file->GetBlobFileNumber());
ASSERT_EQ(blob_file->GetTotalBlobCount(), 2);
const InternalStats* const internal_stats = cfd->internal_stats();
assert(internal_stats);
const uint64_t expected_bytes =
table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes();
const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
ASSERT_GE(compaction_stats.size(), 2);
ASSERT_EQ(compaction_stats[1].bytes_written, expected_bytes);
ASSERT_EQ(compaction_stats[1].num_output_files, 2);
}
class DBCompactionTestBlobError
: public DBCompactionTest,
public testing::WithParamInterface<std::string> {
public:
DBCompactionTestBlobError()
: fault_injection_env_(env_), sync_point_(GetParam()) {}
~DBCompactionTestBlobError() { Close(); }
FaultInjectionTestEnv fault_injection_env_;
std::string sync_point_;
};
INSTANTIATE_TEST_CASE_P(DBCompactionTestBlobError, DBCompactionTestBlobError,
::testing::ValuesIn(std::vector<std::string>{
"BlobFileBuilder::WriteBlobToFile:AddRecord",
"BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
TEST_P(DBCompactionTestBlobError, CompactionError) {
Options options;
options.disable_auto_compactions = true;
options.env = env_;
Reopen(options);
constexpr char first_key[] = "first_key";
constexpr char second_key[] = "second_key";
constexpr char first_value[] = "first_value";
constexpr char second_value[] = "second_value";
constexpr char third_value[] = "third_value";
ASSERT_OK(Put(first_key, first_value));
ASSERT_OK(Put(second_key, first_value));
ASSERT_OK(Flush());
ASSERT_OK(Put(first_key, second_value));
ASSERT_OK(Put(second_key, second_value));
ASSERT_OK(Flush());
ASSERT_OK(Put(first_key, third_value));
ASSERT_OK(Put(second_key, third_value));
ASSERT_OK(Flush());
options.enable_blob_files = true;
options.env = &fault_injection_env_;
Reopen(options);
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
fault_injection_env_.SetFilesystemActive(false,
Status::IOError(sync_point_));
});
SyncPoint::GetInstance()->EnableProcessing();
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), begin, end).IsIOError());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
Version* const current = cfd->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
const auto& l1_files = storage_info->LevelFiles(1);
ASSERT_TRUE(l1_files.empty());
const auto& blob_files = storage_info->GetBlobFiles();
ASSERT_TRUE(blob_files.empty());
const InternalStats* const internal_stats = cfd->internal_stats();
assert(internal_stats);
const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
ASSERT_GE(compaction_stats.size(), 2);
if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") {
ASSERT_EQ(compaction_stats[1].bytes_written, 0);
ASSERT_EQ(compaction_stats[1].num_output_files, 0);
} else {
// SST file writing succeeded; blob file writing failed (during Finish)
ASSERT_GT(compaction_stats[1].bytes_written, 0);
ASSERT_EQ(compaction_stats[1].num_output_files, 1);
}
}
#endif // !defined(ROCKSDB_LITE)
} // namespace ROCKSDB_NAMESPACE

@ -453,6 +453,7 @@ TEST_F(DBFlushTest, FlushWithBlob) {
options.enable_blob_files = true;
options.min_blob_size = min_blob_size;
options.disable_auto_compactions = true;
options.env = env_;
Reopen(options);
@ -525,10 +526,12 @@ TEST_F(DBFlushTest, FlushWithBlob) {
class DBFlushTestBlobError : public DBFlushTest,
public testing::WithParamInterface<std::string> {
public:
DBFlushTestBlobError() : fault_injection_env_(env_) {}
DBFlushTestBlobError()
: fault_injection_env_(env_), sync_point_(GetParam()) {}
~DBFlushTestBlobError() { Close(); }
FaultInjectionTestEnv fault_injection_env_;
std::string sync_point_;
};
INSTANTIATE_TEST_CASE_P(DBFlushTestBlobError, DBFlushTestBlobError,
@ -546,11 +549,12 @@ TEST_P(DBFlushTestBlobError, FlushError) {
ASSERT_OK(Put("key", "blob"));
SyncPoint::GetInstance()->SetCallBack(GetParam(), [this](void* /* arg */) {
fault_injection_env_.SetFilesystemActive(false, Status::IOError());
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
fault_injection_env_.SetFilesystemActive(false,
Status::IOError(sync_point_));
});
SyncPoint::GetInstance()->SetCallBack(
"BuildTable:BeforeFinishBuildTable", [this](void* /* arg */) {
"BuildTable:BeforeDeleteFile", [this](void* /* arg */) {
fault_injection_env_.SetFilesystemActive(true);
});
SyncPoint::GetInstance()->EnableProcessing();
@ -599,11 +603,19 @@ TEST_P(DBFlushTestBlobError, FlushError) {
const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
ASSERT_FALSE(compaction_stats.empty());
ASSERT_EQ(compaction_stats[0].bytes_written, 0);
ASSERT_EQ(compaction_stats[0].num_output_files, 0);
if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") {
ASSERT_EQ(compaction_stats[0].bytes_written, 0);
ASSERT_EQ(compaction_stats[0].num_output_files, 0);
} else {
// SST file writing succeeded; blob file writing failed (during Finish)
ASSERT_GT(compaction_stats[0].bytes_written, 0);
ASSERT_EQ(compaction_stats[0].num_output_files, 1);
}
const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], 0);
ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
compaction_stats[0].bytes_written);
#endif // ROCKSDB_LITE
}

@ -1147,9 +1147,10 @@ Status DBImpl::CompactFilesImpl(
job_context->job_id, c.get(), immutable_db_options_,
file_options_for_compaction_, versions_.get(), &shutting_down_,
preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
GetDataDir(c->column_family_data(), c->output_path_id()), stats_, &mutex_,
&error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, table_cache_, &event_logger_,
GetDataDir(c->column_family_data(), c->output_path_id()),
GetDataDir(c->column_family_data(), 0), stats_, &mutex_, &error_handler_,
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, Env::Priority::USER, io_tracer_,
@ -2954,10 +2955,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
job_context->job_id, c.get(), immutable_db_options_,
file_options_for_compaction_, versions_.get(), &shutting_down_,
preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
GetDataDir(c->column_family_data(), c->output_path_id()), stats_,
&mutex_, &error_handler_, snapshot_seqs,
earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
&event_logger_, c->mutable_cf_options()->paranoid_file_checks,
GetDataDir(c->column_family_data(), c->output_path_id()),
GetDataDir(c->column_family_data(), 0), stats_, &mutex_,
&error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, thread_pri, io_tracer_,
is_manual ? &manual_compaction_paused_ : nullptr, db_id_,

@ -1393,7 +1393,6 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
// Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest.
const bool has_output = meta.fd.GetFileSize() > 0;
assert(has_output || blob_file_additions.empty());
constexpr int level = 0;
@ -1413,15 +1412,16 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
if (has_output) {
stats.bytes_written = meta.fd.GetFileSize();
stats.num_output_files = 1;
}
const auto& blobs = edit->GetBlobFileAdditions();
for (const auto& blob : blobs) {
stats.bytes_written += blob.GetTotalBlobBytes();
}
stats.num_output_files = static_cast<int>(blobs.size()) + 1;
const auto& blobs = edit->GetBlobFileAdditions();
for (const auto& blob : blobs) {
stats.bytes_written += blob.GetTotalBlobBytes();
}
stats.num_output_files += static_cast<int>(blobs.size());
cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
stats.bytes_written);

@ -380,6 +380,7 @@ TEST_F(DBWALTest, RecoverWithBlob) {
options.min_blob_size = min_blob_size;
options.avoid_flush_during_recovery = false;
options.disable_auto_compactions = true;
options.env = env_;
Reopen(options);
@ -440,10 +441,12 @@ class DBRecoveryTestBlobError
: public DBWALTest,
public testing::WithParamInterface<std::string> {
public:
DBRecoveryTestBlobError() : fault_injection_env_(env_) {}
DBRecoveryTestBlobError()
: fault_injection_env_(env_), sync_point_(GetParam()) {}
~DBRecoveryTestBlobError() { Close(); }
FaultInjectionTestEnv fault_injection_env_;
std::string sync_point_;
};
INSTANTIATE_TEST_CASE_P(DBRecoveryTestBlobError, DBRecoveryTestBlobError,
@ -457,11 +460,12 @@ TEST_P(DBRecoveryTestBlobError, RecoverWithBlobError) {
// Reopen with blob files enabled but make blob file writing fail during
// recovery.
SyncPoint::GetInstance()->SetCallBack(GetParam(), [this](void* /* arg */) {
fault_injection_env_.SetFilesystemActive(false, Status::IOError());
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
fault_injection_env_.SetFilesystemActive(false,
Status::IOError(sync_point_));
});
SyncPoint::GetInstance()->SetCallBack(
"BuildTable:BeforeFinishBuildTable", [this](void* /* arg */) {
"BuildTable:BeforeDeleteFile", [this](void* /* arg */) {
fault_injection_env_.SetFilesystemActive(true);
});
SyncPoint::GetInstance()->EnableProcessing();

@ -438,7 +438,6 @@ Status FlushJob::WriteLevel0Table() {
// Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest.
const bool has_output = meta_.fd.GetFileSize() > 0;
assert(has_output || blob_file_additions.empty());
if (s.ok() && has_output) {
// if we have more than 1 background thread, then we cannot
@ -467,15 +466,16 @@ Status FlushJob::WriteLevel0Table() {
if (has_output) {
stats.bytes_written = meta_.fd.GetFileSize();
stats.num_output_files = 1;
}
const auto& blobs = edit_->GetBlobFileAdditions();
for (const auto& blob : blobs) {
stats.bytes_written += blob.GetTotalBlobBytes();
}
stats.num_output_files = static_cast<int>(blobs.size()) + 1;
const auto& blobs = edit_->GetBlobFileAdditions();
for (const auto& blob : blobs) {
stats.bytes_written += blob.GetTotalBlobBytes();
}
stats.num_output_files += static_cast<int>(blobs.size());
RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,

Loading…
Cancel
Save