Accumulate blob file additions in VersionEdit during recovery (#7903)

Summary:
During recovery, RocksDB performs a kind of dummy flush; namely, entries
from the WAL are added to memtables, which then get written to SSTs and
blob files (if enabled) just like during a regular flush. Note that
multiple memtables might be flushed during recovery for the same column
family, for example, if the DB is reopened with a lower write buffer size,
and therefore, we need to make sure to collect all SST and blob file
additions. The patch fixes a bug in the earlier logic which resulted in
later blob file additions overwriting earlier ones.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7903

Test Plan: Added a unit test and ran `db_stress`.

Reviewed By: jay-zhuang

Differential Revision: D26110847

Pulled By: ltamasi

fbshipit-source-id: eddb50a608a88f54f3cec3a423de8235aba951fd
main
Levi Tamasi 4 years ago committed by Facebook GitHub Bot
parent 95013df278
commit c696f27432
  1. 4
      db/db_impl/db_impl_open.cc
  2. 66
      db/db_wal_test.cc
  3. 2
      db/version_edit.h

@ -1391,7 +1391,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
meta.oldest_ancester_time, meta.file_creation_time, meta.oldest_ancester_time, meta.file_creation_time,
meta.file_checksum, meta.file_checksum_func_name); meta.file_checksum, meta.file_checksum_func_name);
edit->SetBlobFileAdditions(std::move(blob_file_additions)); for (const auto& blob : blob_file_additions) {
edit->AddBlobFile(blob);
}
} }
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1); InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);

@ -437,6 +437,72 @@ TEST_F(DBWALTest, RecoverWithBlob) {
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} }
TEST_F(DBWALTest, RecoverWithBlobMultiSST) {
// Write several large (4 KB) values without flushing. Note that blob files
// are not actually enabled at this point.
std::string large_value(1 << 12, 'a');
constexpr int num_keys = 64;
for (int i = 0; i < num_keys; ++i) {
ASSERT_OK(Put(Key(i), large_value));
}
// There should be no files just yet since we haven't flushed.
{
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
ASSERT_NE(versions, nullptr);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
ASSERT_NE(cfd, nullptr);
Version* const current = cfd->current();
ASSERT_NE(current, nullptr);
const VersionStorageInfo* const storage_info = current->storage_info();
ASSERT_NE(storage_info, nullptr);
ASSERT_EQ(storage_info->num_non_empty_levels(), 0);
ASSERT_TRUE(storage_info->GetBlobFiles().empty());
}
// Reopen the database with blob files enabled and write buffer size set to a
// smaller value. Multiple table files+blob files should be written and added
// to the Version during recovery.
Options options;
options.write_buffer_size = 1 << 16; // 64 KB
options.enable_blob_files = true;
options.avoid_flush_during_recovery = false;
options.disable_auto_compactions = true;
options.env = env_;
Reopen(options);
for (int i = 0; i < num_keys; ++i) {
ASSERT_EQ(Get(Key(i)), large_value);
}
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
ASSERT_NE(versions, nullptr);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
ASSERT_NE(cfd, nullptr);
Version* const current = cfd->current();
ASSERT_NE(current, nullptr);
const VersionStorageInfo* const storage_info = current->storage_info();
ASSERT_NE(storage_info, nullptr);
const auto& l0_files = storage_info->LevelFiles(0);
ASSERT_GT(l0_files.size(), 1);
const auto& blob_files = storage_info->GetBlobFiles();
ASSERT_GT(blob_files.size(), 1);
ASSERT_EQ(l0_files.size(), blob_files.size());
}
class DBRecoveryTestBlobError class DBRecoveryTestBlobError
: public DBWALTest, : public DBWALTest,
public testing::WithParamInterface<std::string> { public testing::WithParamInterface<std::string> {

@ -431,6 +431,7 @@ class VersionEdit {
} }
void SetBlobFileAdditions(BlobFileAdditions blob_file_additions) { void SetBlobFileAdditions(BlobFileAdditions blob_file_additions) {
assert(blob_file_additions_.empty());
blob_file_additions_ = std::move(blob_file_additions); blob_file_additions_ = std::move(blob_file_additions);
} }
@ -454,6 +455,7 @@ class VersionEdit {
} }
void SetBlobFileGarbages(BlobFileGarbages blob_file_garbages) { void SetBlobFileGarbages(BlobFileGarbages blob_file_garbages) {
assert(blob_file_garbages_.empty());
blob_file_garbages_ = std::move(blob_file_garbages); blob_file_garbages_ = std::move(blob_file_garbages);
} }

Loading…
Cancel
Save