Integrate blob file writing with recovery (#7388)

Summary:
The patch adds support for extracting large values into blob files when
performing a flush during recovery (when `avoid_flush_during_recovery` is
`false`). Blob files are built and added to the `Version` similarly to flush.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7388

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D23709912

Pulled By: ltamasi

fbshipit-source-id: ce48b4227849cf25429ae98574e72b0e1cb9c67d
main
Levi Tamasi 4 years ago committed by Facebook GitHub Bot
parent 67bd5401e9
commit bf1aeebb6c
  1. 33
      db/db_impl/db_impl_open.cc
  2. 157
      db/db_wal_test.cc

@ -1274,6 +1274,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
const uint64_t start_micros = env_->NowMicros(); const uint64_t start_micros = env_->NowMicros();
FileMetaData meta; FileMetaData meta;
std::vector<BlobFileAddition> blob_file_additions;
std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem( std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
new std::list<uint64_t>::iterator( new std::list<uint64_t>::iterator(
@ -1326,10 +1327,10 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
s = BuildTable( s = BuildTable(
dbname_, versions_.get(), env_, fs_.get(), *cfd->ioptions(), dbname_, versions_.get(), env_, fs_.get(), *cfd->ioptions(),
mutable_cf_options, file_options_for_compaction_, cfd->table_cache(), mutable_cf_options, file_options_for_compaction_, cfd->table_cache(),
iter.get(), std::move(range_del_iters), &meta, iter.get(), std::move(range_del_iters), &meta, &blob_file_additions,
nullptr /* blob_file_additions */, cfd->internal_comparator(), cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), cfd->GetID(), cfd->GetName(), snapshot_seqs,
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, earliest_write_conflict_snapshot, snapshot_checker,
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
mutable_cf_options.sample_for_compression, mutable_cf_options.sample_for_compression,
mutable_cf_options.compression_opts, paranoid_file_checks, mutable_cf_options.compression_opts, paranoid_file_checks,
@ -1351,23 +1352,39 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
// Note that if file_size is zero, the file has been deleted and // Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest. // should not be added to the manifest.
int level = 0; const bool has_output = meta.fd.GetFileSize() > 0;
if (s.ok() && meta.fd.GetFileSize() > 0) { assert(has_output || blob_file_additions.empty());
constexpr int level = 0;
if (s.ok() && has_output) {
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest, meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.fd.smallest_seqno, meta.fd.largest_seqno, meta.fd.smallest_seqno, meta.fd.largest_seqno,
meta.marked_for_compaction, meta.oldest_blob_file_number, meta.marked_for_compaction, meta.oldest_blob_file_number,
meta.oldest_ancester_time, meta.file_creation_time, meta.oldest_ancester_time, meta.file_creation_time,
meta.file_checksum, meta.file_checksum_func_name); meta.file_checksum, meta.file_checksum_func_name);
edit->SetBlobFileAdditions(std::move(blob_file_additions));
} }
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1); InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
stats.micros = env_->NowMicros() - start_micros; stats.micros = env_->NowMicros() - start_micros;
if (has_output) {
stats.bytes_written = meta.fd.GetFileSize(); stats.bytes_written = meta.fd.GetFileSize();
stats.num_output_files = 1;
const auto& blobs = edit->GetBlobFileAdditions();
for (const auto& blob : blobs) {
stats.bytes_written += blob.GetTotalBlobBytes();
}
stats.num_output_files = static_cast<int>(blobs.size()) + 1;
}
cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats); cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
meta.fd.GetFileSize()); stats.bytes_written);
RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize()); RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
return s; return s;
} }

@ -338,6 +338,163 @@ TEST_F(DBWALTest, RecoverWithTableHandle) {
} while (ChangeWalOptions()); } while (ChangeWalOptions());
} }
TEST_F(DBWALTest, RecoverWithBlob) {
// Write a value that's below the prospective size limit for blobs and another
// one that's above. Note that blob files are not actually enabled at this
// point.
constexpr uint64_t min_blob_size = 10;
constexpr char short_value[] = "short";
static_assert(sizeof(short_value) - 1 < min_blob_size,
"short_value too long");
constexpr char long_value[] = "long_value";
static_assert(sizeof(long_value) - 1 >= min_blob_size,
"long_value too short");
ASSERT_OK(Put("key1", short_value));
ASSERT_OK(Put("key2", long_value));
// There should be no files just yet since we haven't flushed.
{
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
Version* const current = cfd->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
ASSERT_EQ(storage_info->num_non_empty_levels(), 0);
ASSERT_TRUE(storage_info->GetBlobFiles().empty());
}
// Reopen the database with blob files enabled. A new table file/blob file
// pair should be written during recovery.
Options options;
options.enable_blob_files = true;
options.min_blob_size = min_blob_size;
options.avoid_flush_during_recovery = false;
options.disable_auto_compactions = true;
Reopen(options);
ASSERT_EQ(Get("key1"), short_value);
// TODO: enable once Get support is implemented for blobs
// ASSERT_EQ(Get("key2"), long_value);
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
Version* const current = cfd->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
const auto& l0_files = storage_info->LevelFiles(0);
ASSERT_EQ(l0_files.size(), 1);
const FileMetaData* const table_file = l0_files[0];
assert(table_file);
const auto& blob_files = storage_info->GetBlobFiles();
ASSERT_EQ(blob_files.size(), 1);
const auto& blob_file = blob_files.begin()->second;
assert(blob_file);
ASSERT_EQ(table_file->smallest.user_key(), "key1");
ASSERT_EQ(table_file->largest.user_key(), "key2");
ASSERT_EQ(table_file->fd.smallest_seqno, 1);
ASSERT_EQ(table_file->fd.largest_seqno, 2);
ASSERT_EQ(table_file->oldest_blob_file_number,
blob_file->GetBlobFileNumber());
ASSERT_EQ(blob_file->GetTotalBlobCount(), 1);
#ifndef ROCKSDB_LITE
const InternalStats* const internal_stats = cfd->internal_stats();
assert(internal_stats);
const uint64_t expected_bytes =
table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes();
const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
ASSERT_FALSE(compaction_stats.empty());
ASSERT_EQ(compaction_stats[0].bytes_written, expected_bytes);
ASSERT_EQ(compaction_stats[0].num_output_files, 2);
const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], expected_bytes);
#endif // ROCKSDB_LITE
}
class DBRecoveryTestBlobError
: public DBWALTest,
public testing::WithParamInterface<std::string> {
public:
DBRecoveryTestBlobError() : fault_injection_env_(env_) {}
~DBRecoveryTestBlobError() { Close(); }
FaultInjectionTestEnv fault_injection_env_;
};
INSTANTIATE_TEST_CASE_P(DBRecoveryTestBlobError, DBRecoveryTestBlobError,
::testing::ValuesIn(std::vector<std::string>{
"BlobFileBuilder::WriteBlobToFile:AddRecord",
"BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
TEST_P(DBRecoveryTestBlobError, RecoverWithBlobError) {
// Write a value. Note that blob files are not actually enabled at this point.
ASSERT_OK(Put("key", "blob"));
// Reopen with blob files enabled but make blob file writing fail during
// recovery.
SyncPoint::GetInstance()->SetCallBack(GetParam(), [this](void* /* arg */) {
fault_injection_env_.SetFilesystemActive(false, Status::IOError());
});
SyncPoint::GetInstance()->SetCallBack(
"BuildTable:BeforeFinishBuildTable", [this](void* /* arg */) {
fault_injection_env_.SetFilesystemActive(true);
});
SyncPoint::GetInstance()->EnableProcessing();
Options options;
options.enable_blob_files = true;
options.avoid_flush_during_recovery = false;
options.disable_auto_compactions = true;
options.env = &fault_injection_env_;
ASSERT_NOK(TryReopen(options));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
// Make sure the files generated by the failed recovery have been deleted.
std::vector<std::string> files;
ASSERT_OK(env_->GetChildren(dbname_, &files));
for (const auto& file : files) {
uint64_t number = 0;
FileType type = kTableFile;
if (!ParseFileName(file, &number, &type)) {
continue;
}
ASSERT_NE(type, kTableFile);
ASSERT_NE(type, kBlobFile);
}
}
TEST_F(DBWALTest, IgnoreRecoveredLog) { TEST_F(DBWALTest, IgnoreRecoveredLog) {
std::string backup_logs = dbname_ + "/backup_logs"; std::string backup_logs = dbname_ + "/backup_logs";

Loading…
Cancel
Save