Fix a bug in file ingestion (#5760)

Summary:
Before this PR, when the number of column families involved in a file ingestion exceeds 2, a bug in the looping logic prevents correct file number being assigned to each ingestion job.
Also skip deleting non-existing hard links during cleanup-after-failure.

Test plan (devserver)
```
$COMPILE_WITH_ASAN=1 make all
$./external_sst_file_test --gtest_filter=ExternalSSTFileTest/ExternalSSTFileTest.IngestFilesIntoMultipleColumnFamilies_*/*
$makke check
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5760

Differential Revision: D17142982

Pulled By: riversand963

fbshipit-source-id: 06c1847a4e7a402647bcf28d124e70f2a0f9daf6
main
Yanqin Jin 5 years ago committed by Facebook Github Bot
parent 672befea2a
commit 44eca41add
  1. 1
      HISTORY.md
  2. 4
      db/db_impl/db_impl.cc
  3. 5
      db/external_sst_file_ingestion_job.cc
  4. 55
      db/external_sst_file_test.cc

@ -4,6 +4,7 @@
* Fixed a number of data races in BlobDB. * Fixed a number of data races in BlobDB.
* Fix a bug where the compaction snapshot refresh feature is not disabled as advertised when `snap_refresh_nanos` is set to 0.. * Fix a bug where the compaction snapshot refresh feature is not disabled as advertised when `snap_refresh_nanos` is set to 0..
* Fix bloom filter lookups by the MultiGet batching API when BlockBasedTableOptions::whole_key_filtering is false, by checking that a key is in the perfix_extractor domain and extracting the prefix before looking up. * Fix bloom filter lookups by the MultiGet batching API when BlockBasedTableOptions::whole_key_filtering is false, by checking that a key is in the perfix_extractor domain and extracting the prefix before looking up.
* Fix a bug in file ingestion caused by incorrect file number allocation when the number of column families involved in the ingestion exceeds 2.
### New Features ### New Features
* VerifyChecksum() by default will issue readahead. Allow ReadOptions to be passed in to those functions to override the readhead size. For checksum verifying before external SST file ingestion, a new option IngestExternalFileOptions.verify_checksums_readahead_size, is added for this readahead setting. * VerifyChecksum() by default will issue readahead. Allow ReadOptions to be passed in to those functions to override the readhead size. For checksum verifying before external SST file ingestion, a new option IngestExternalFileOptions.verify_checksums_readahead_size, is added for this readahead setting.
* When user uses options.force_consistency_check in RocksDb, instead of crashing the process, we now pass the error back to the users without killing the process. * When user uses options.force_consistency_check in RocksDb, instead of crashing the process, we now pass the error back to the users without killing the process.

@ -3766,9 +3766,9 @@ Status DBImpl::IngestExternalFiles(
exec_results.emplace_back(false, Status::OK()); exec_results.emplace_back(false, Status::OK());
} }
// TODO(yanqin) maybe make jobs run in parallel // TODO(yanqin) maybe make jobs run in parallel
uint64_t start_file_number = next_file_number;
for (size_t i = 1; i != num_cfs; ++i) { for (size_t i = 1; i != num_cfs; ++i) {
uint64_t start_file_number = start_file_number += args[i - 1].external_files.size();
next_file_number + args[i - 1].external_files.size();
auto* cfd = auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd(); static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);

@ -160,7 +160,7 @@ Status ExternalSstFileIngestionJob::Prepare(
// We failed, remove all files that we copied into the db // We failed, remove all files that we copied into the db
for (IngestedFileInfo& f : files_to_ingest_) { for (IngestedFileInfo& f : files_to_ingest_) {
if (f.internal_file_path.empty()) { if (f.internal_file_path.empty()) {
break; continue;
} }
Status s = env_->DeleteFile(f.internal_file_path); Status s = env_->DeleteFile(f.internal_file_path);
if (!s.ok()) { if (!s.ok()) {
@ -291,6 +291,9 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
// We failed to add the files to the database // We failed to add the files to the database
// remove all the files we copied // remove all the files we copied
for (IngestedFileInfo& f : files_to_ingest_) { for (IngestedFileInfo& f : files_to_ingest_) {
if (f.internal_file_path.empty()) {
continue;
}
Status s = env_->DeleteFile(f.internal_file_path); Status s = env_->DeleteFile(f.internal_file_path);
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, ROCKS_LOG_WARN(db_options_.info_log,

@ -2367,10 +2367,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
new FaultInjectionTestEnv(env_)); new FaultInjectionTestEnv(env_));
Options options = CurrentOptions(); Options options = CurrentOptions();
options.env = fault_injection_env.get(); options.env = fault_injection_env.get();
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu", "eevee"}, options);
std::vector<ColumnFamilyHandle*> column_families; std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]); column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]); column_families.push_back(handles_[1]);
column_families.push_back(handles_[2]);
std::vector<IngestExternalFileOptions> ifos(column_families.size()); std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) { for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno ifo.allow_global_seqno = true; // Always allow global_seqno
@ -2384,6 +2385,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")}); {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back( data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")}); {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
data.push_back(
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
// Resize the true_data vector upon construction to avoid re-alloc // Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data( std::vector<std::map<std::string, std::string>> true_data(
column_families.size()); column_families.size());
@ -2391,8 +2395,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
-1, true, true_data); -1, true, true_data);
ASSERT_OK(s); ASSERT_OK(s);
Close(); Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
ASSERT_EQ(2, handles_.size()); options);
ASSERT_EQ(3, handles_.size());
int cf = 0; int cf = 0;
for (const auto& verify_map : true_data) { for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) { for (const auto& elem : verify_map) {
@ -2424,10 +2429,11 @@ TEST_P(ExternalSSTFileTest,
Options options = CurrentOptions(); Options options = CurrentOptions();
options.env = fault_injection_env.get(); options.env = fault_injection_env.get();
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu", "eevee"}, options);
const std::vector<std::map<std::string, std::string>> data_before_ingestion = const std::vector<std::map<std::string, std::string>> data_before_ingestion =
{{{"foo1", "fv1_0"}, {"foo2", "fv2_0"}, {"foo3", "fv3_0"}}, {{{"foo1", "fv1_0"}, {"foo2", "fv2_0"}, {"foo3", "fv3_0"}},
{{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}}}; {{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}},
{{"bar4", "bv4_0"}, {"bar5", "bv5_0"}, {"bar6", "bv6_0"}}};
for (size_t i = 0; i != handles_.size(); ++i) { for (size_t i = 0; i != handles_.size(); ++i) {
int cf = static_cast<int>(i); int cf = static_cast<int>(i);
const auto& orig_data = data_before_ingestion[i]; const auto& orig_data = data_before_ingestion[i];
@ -2440,6 +2446,7 @@ TEST_P(ExternalSSTFileTest,
std::vector<ColumnFamilyHandle*> column_families; std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]); column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]); column_families.push_back(handles_[1]);
column_families.push_back(handles_[2]);
std::vector<IngestExternalFileOptions> ifos(column_families.size()); std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) { for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno ifo.allow_global_seqno = true; // Always allow global_seqno
@ -2453,6 +2460,8 @@ TEST_P(ExternalSSTFileTest,
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")}); {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back( data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")}); {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
data.push_back(
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
// Resize the true_data vector upon construction to avoid re-alloc // Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data( std::vector<std::map<std::string, std::string>> true_data(
column_families.size()); column_families.size());
@ -2506,10 +2515,11 @@ TEST_P(ExternalSSTFileTest,
dbfull()->ReleaseSnapshot(read_opts.snapshot); dbfull()->ReleaseSnapshot(read_opts.snapshot);
Close(); Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
options);
// Should see consistent state after ingestion for all column families even // Should see consistent state after ingestion for all column families even
// without snapshot. // without snapshot.
ASSERT_EQ(2, handles_.size()); ASSERT_EQ(3, handles_.size());
int cf = 0; int cf = 0;
for (const auto& verify_map : true_data) { for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) { for (const auto& elem : verify_map) {
@ -2539,10 +2549,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
"DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"}, "DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"},
}); });
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu", "eevee"}, options);
std::vector<ColumnFamilyHandle*> column_families; std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]); column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]); column_families.push_back(handles_[1]);
column_families.push_back(handles_[2]);
std::vector<IngestExternalFileOptions> ifos(column_families.size()); std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) { for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno ifo.allow_global_seqno = true; // Always allow global_seqno
@ -2556,6 +2567,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")}); {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back( data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")}); {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
data.push_back(
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
// Resize the true_data vector upon construction to avoid re-alloc // Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data( std::vector<std::map<std::string, std::string>> true_data(
column_families.size()); column_families.size());
@ -2575,8 +2589,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
fault_injection_env->SetFilesystemActive(true); fault_injection_env->SetFilesystemActive(true);
Close(); Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
ASSERT_EQ(2, handles_.size()); options);
ASSERT_EQ(3, handles_.size());
int cf = 0; int cf = 0;
for (const auto& verify_map : true_data) { for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) { for (const auto& elem : verify_map) {
@ -2605,10 +2620,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
"DBImpl::IngestExternalFiles:BeforeJobsRun:1"}, "DBImpl::IngestExternalFiles:BeforeJobsRun:1"},
}); });
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu", "eevee"}, options);
std::vector<ColumnFamilyHandle*> column_families; std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]); column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]); column_families.push_back(handles_[1]);
column_families.push_back(handles_[2]);
std::vector<IngestExternalFileOptions> ifos(column_families.size()); std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) { for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno ifo.allow_global_seqno = true; // Always allow global_seqno
@ -2622,6 +2638,8 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")}); {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back( data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")}); {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
data.push_back(
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
// Resize the true_data vector upon construction to avoid re-alloc // Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data( std::vector<std::map<std::string, std::string>> true_data(
column_families.size()); column_families.size());
@ -2641,8 +2659,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
fault_injection_env->SetFilesystemActive(true); fault_injection_env->SetFilesystemActive(true);
Close(); Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
ASSERT_EQ(2, handles_.size()); options);
ASSERT_EQ(3, handles_.size());
int cf = 0; int cf = 0;
for (const auto& verify_map : true_data) { for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) { for (const auto& elem : verify_map) {
@ -2662,7 +2681,7 @@ TEST_P(ExternalSSTFileTest,
Options options = CurrentOptions(); Options options = CurrentOptions();
options.env = fault_injection_env.get(); options.env = fault_injection_env.get();
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu", "eevee"}, options);
SyncPoint::GetInstance()->ClearTrace(); SyncPoint::GetInstance()->ClearTrace();
SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->DisableProcessing();
@ -2680,6 +2699,7 @@ TEST_P(ExternalSSTFileTest,
std::vector<ColumnFamilyHandle*> column_families; std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]); column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]); column_families.push_back(handles_[1]);
column_families.push_back(handles_[2]);
std::vector<IngestExternalFileOptions> ifos(column_families.size()); std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) { for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno ifo.allow_global_seqno = true; // Always allow global_seqno
@ -2693,6 +2713,8 @@ TEST_P(ExternalSSTFileTest,
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")}); {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back( data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")}); {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
data.push_back(
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
// Resize the true_data vector upon construction to avoid re-alloc // Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data( std::vector<std::map<std::string, std::string>> true_data(
column_families.size()); column_families.size());
@ -2713,8 +2735,9 @@ TEST_P(ExternalSSTFileTest,
fault_injection_env->DropUnsyncedFileData(); fault_injection_env->DropUnsyncedFileData();
fault_injection_env->SetFilesystemActive(true); fault_injection_env->SetFilesystemActive(true);
Close(); Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
ASSERT_EQ(2, handles_.size()); options);
ASSERT_EQ(3, handles_.size());
int cf = 0; int cf = 0;
for (const auto& verify_map : true_data) { for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) { for (const auto& elem : verify_map) {

Loading…
Cancel
Save