Improve flushing multiple column families (#4708)

Summary:
If one column family is dropped, we should simply skip it and continue to flush
other active ones.
Currently we use Status::ShutdownInProgress to notify caller of column families
being dropped. In the future, we should consider using a different Status code.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4708

Differential Revision: D13378954

Pulled By: riversand963

fbshipit-source-id: 42f248cdf2d32d4c0f677cd39012694b8f1328ca
main
Yanqin Jin 6 years ago committed by Facebook Github Bot
parent 67e5b5420e
commit 4fce44fc8b
  1. 1
      HISTORY.md
  2. 81
      db/db_flush_test.cc
  3. 83
      db/db_impl_compaction_flush.cc
  4. 2
      db/memtable_list.cc
  5. 2
      db/version_edit.cc
  6. 79
      db/version_set.cc
  7. 140
      db/version_set_test.cc

@ -2,6 +2,7 @@
## Unreleased
### New Features
* Enabled checkpoint on readonly db (DBImplReadOnly).
* Make DB ignore dropped column families while committing results of atomic flush.
### Public API Change
* Transaction::GetForUpdate is extended with a do_validate parameter with default value of true. If false it skips validating the snapshot before doing the read. Similarly ::Merge, ::Put, ::Delete, and ::SingleDelete are extended with assume_tracked with default value of false. If true it indicates that call is assumed to be after a ::GetForUpdate.

@ -407,6 +407,87 @@ TEST_P(DBAtomicFlushTest, AtomicFlushRollbackSomeJobs) {
Destroy(options);
}
TEST_P(DBAtomicFlushTest, FlushMultipleCFs_DropSomeBeforeRequestFlush) {
bool atomic_flush = GetParam();
if (!atomic_flush) {
return;
}
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = atomic_flush;
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
size_t num_cfs = handles_.size();
ASSERT_EQ(3, num_cfs);
WriteOptions wopts;
wopts.disableWAL = true;
std::vector<int> cf_ids;
for (size_t i = 0; i != num_cfs; ++i) {
int cf_id = static_cast<int>(i);
ASSERT_OK(Put(cf_id, "key", "value", wopts));
cf_ids.push_back(cf_id);
}
ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
ASSERT_TRUE(Flush(cf_ids).IsShutdownInProgress());
Destroy(options);
}
TEST_P(DBAtomicFlushTest,
FlushMultipleCFs_DropSomeAfterScheduleFlushBeforeFlushJobRun) {
bool atomic_flush = GetParam();
if (!atomic_flush) {
return;
}
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = atomic_flush;
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
"DBAtomicFlushTest::BeforeDropCF"},
{"DBAtomicFlushTest::AfterDropCF",
"DBImpl::BackgroundCallFlush:start"}});
SyncPoint::GetInstance()->EnableProcessing();
size_t num_cfs = handles_.size();
ASSERT_EQ(3, num_cfs);
WriteOptions wopts;
wopts.disableWAL = true;
for (size_t i = 0; i != num_cfs; ++i) {
int cf_id = static_cast<int>(i);
ASSERT_OK(Put(cf_id, "key", "value", wopts));
}
port::Thread user_thread([&]() {
TEST_SYNC_POINT("DBAtomicFlushTest::BeforeDropCF");
ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
TEST_SYNC_POINT("DBAtomicFlushTest::AfterDropCF");
});
FlushOptions flush_opts;
flush_opts.wait = true;
ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
user_thread.join();
for (size_t i = 0; i != num_cfs; ++i) {
int cf_id = static_cast<int>(i);
ASSERT_EQ("value", Get(cf_id, "key"));
}
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "eevee"}, options);
num_cfs = handles_.size();
ASSERT_EQ(2, num_cfs);
for (size_t i = 0; i != num_cfs; ++i) {
int cf_id = static_cast<int>(i);
ASSERT_EQ("value", Get(cf_id, "key"));
}
Destroy(options);
}
INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
testing::Bool());

@ -241,20 +241,25 @@ Status DBImpl::FlushMemTablesToOutputFiles(
return AtomicFlushMemTablesToOutputFiles(bg_flush_args, made_progress,
job_context, log_buffer);
}
Status s;
Status status;
for (auto& arg : bg_flush_args) {
ColumnFamilyData* cfd = arg.cfd_;
const MutableCFOptions& mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
SuperVersionContext* superversion_context = arg.superversion_context_;
s = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
job_context, superversion_context,
log_buffer);
Status s = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
job_context, superversion_context,
log_buffer);
if (!s.ok()) {
break;
status = s;
if (!s.IsShutdownInProgress()) {
// At this point, DB is not shutting down, nor is cfd dropped.
// Something is wrong, thus we break out of the loop.
break;
}
}
}
return s;
return status;
}
/*
@ -353,8 +358,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
autovector<std::pair<bool, Status>> exec_status;
for (int i = 0; i != num_cfs; ++i) {
// Initially all jobs are not executed, with status OK.
std::pair<bool, Status> elem(false, Status::OK());
exec_status.emplace_back(elem);
exec_status.emplace_back(false, Status::OK());
}
if (s.ok()) {
@ -363,10 +367,6 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
exec_status[i].second =
jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]);
exec_status[i].first = true;
if (!exec_status[i].second.ok()) {
s = exec_status[i].second;
break;
}
}
if (num_cfs > 1) {
TEST_SYNC_POINT(
@ -374,17 +374,27 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
TEST_SYNC_POINT(
"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
}
if (s.ok()) {
exec_status[0].second =
jobs[0].Run(&logs_with_prep_tracker_, &file_meta[0]);
exec_status[0].first = true;
if (!exec_status[0].second.ok()) {
s = exec_status[0].second;
exec_status[0].second =
jobs[0].Run(&logs_with_prep_tracker_, &file_meta[0]);
exec_status[0].first = true;
Status error_status;
for (const auto& e : exec_status) {
if (!e.second.ok()) {
s = e.second;
if (!e.second.IsShutdownInProgress()) {
// If a flush job did not return OK, and the CF is not dropped, and
// the DB is not shutting down, then we have to return this result to
// caller later.
error_status = e.second;
}
}
}
s = error_status.ok() ? s : error_status;
}
if (s.ok()) {
if (s.ok() || s.IsShutdownInProgress()) {
// Sync on all distinct output directories.
for (auto dir : distinct_output_dirs) {
if (dir != nullptr) {
@ -398,6 +408,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
if (s.ok()) {
autovector<const autovector<MemTable*>*> mems_list;
for (int i = 0; i != num_cfs; ++i) {
if (cfds[i]->IsDropped()) {
continue;
}
const auto& mems = jobs[i].GetMemTables();
mems_list.emplace_back(&mems);
}
@ -405,6 +418,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
autovector<MemTableList*> imm_lists;
autovector<const MutableCFOptions*> mutable_cf_options_list;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
all_cfds.emplace_back(cfd);
imm_lists.emplace_back(cfd->imm());
mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions());
@ -418,10 +434,13 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
}
}
if (s.ok()) {
if (s.ok() || s.IsShutdownInProgress()) {
assert(num_cfs ==
static_cast<int>(job_context->superversion_contexts.size()));
for (int i = 0; i != num_cfs; ++i) {
if (cfds[i]->IsDropped()) {
continue;
}
InstallSuperVersionAndScheduleWork(cfds[i],
&job_context->superversion_contexts[i],
*cfds[i]->GetLatestMutableCFOptions());
@ -437,6 +456,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
for (int i = 0; i != num_cfs; ++i) {
if (cfds[i]->IsDropped()) {
continue;
}
NotifyOnFlushCompleted(cfds[i], &file_meta[i],
*cfds[i]->GetLatestMutableCFOptions(),
job_context->job_id, jobs[i].GetTableProperties());
@ -456,7 +478,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
#endif // ROCKSDB_LITE
}
if (!s.ok()) {
// Need to undo atomic flush if something went wrong, i.e. s is not OK and
// it is not because of CF drop.
if (!s.ok() && !s.IsShutdownInProgress()) {
// Have to cancel the flush jobs that have NOT executed because we need to
// unref the versions.
for (int i = 0; i != num_cfs; ++i) {
@ -464,17 +488,15 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
jobs[i].Cancel();
}
}
if (!s.IsShutdownInProgress()) {
for (int i = 0; i != num_cfs; ++i) {
if (exec_status[i].first && exec_status[i].second.ok()) {
auto& mems = jobs[i].GetMemTables();
cfds[i]->imm()->RollbackMemtableFlush(mems,
file_meta[i].fd.GetNumber());
}
for (int i = 0; i != num_cfs; ++i) {
if (exec_status[i].first && exec_status[i].second.ok()) {
auto& mems = jobs[i].GetMemTables();
cfds[i]->imm()->RollbackMemtableFlush(mems,
file_meta[i].fd.GetNumber());
}
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
return s;
@ -1541,6 +1563,7 @@ Status DBImpl::AtomicFlushMemTables(
write_thread_.ExitUnbatched(&w);
}
}
TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");
if (s.ok() && flush_options.wait) {
autovector<const uint64_t*> flush_memtable_ids;

@ -426,7 +426,7 @@ Status MemTableList::TryInstallMemtableFlushResults(
imm_lists[pos]->InstallNewVersion();
}
if (s.ok()) {
if (s.ok() || s.IsShutdownInProgress()) {
for (size_t i = 0; i != batch_sz; ++i) {
if (tmp_cfds[i]->IsDropped()) {
continue;

@ -579,7 +579,7 @@ std::string VersionEdit::DebugString(bool hex_key) const {
AppendNumberTo(&r, max_column_family_);
}
if (is_in_atomic_group_) {
r.append("\n AtomicGroup: ");
r.append("\n AtomicGroup: ");
AppendNumberTo(&r, remaining_entries_);
r.append(" entries remains");
}

@ -2849,6 +2849,7 @@ Status VersionSet::ProcessManifestWrites(
batch_edits.push_back(first_writer.edit_list.front());
} else {
auto it = manifest_writers_.cbegin();
size_t group_start = std::numeric_limits<size_t>::max();
while (it != manifest_writers_.cend()) {
if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
// no group commits for column family add or drop
@ -2857,7 +2858,36 @@ Status VersionSet::ProcessManifestWrites(
last_writer = *(it++);
assert(last_writer != nullptr);
assert(last_writer->cfd != nullptr);
if (last_writer->cfd != nullptr && last_writer->cfd->IsDropped()) {
if (last_writer->cfd->IsDropped()) {
// If we detect a dropped CF at this point, and the corresponding
// version edits belong to an atomic group, then we need to find out
// the preceding version edits in the same atomic group, and update
// their `remaining_entries_` member variable because we are NOT going
// to write the version edits' of dropped CF to the MANIFEST. If we
// don't update, then Recover can report corrupted atomic group because
// the `remaining_entries_` do not match.
if (!batch_edits.empty()) {
if (batch_edits.back()->is_in_atomic_group_ &&
batch_edits.back()->remaining_entries_ > 0) {
assert(group_start < batch_edits.size());
const auto& edit_list = last_writer->edit_list;
size_t k = 0;
while (k < edit_list.size()) {
if (!edit_list[k]->is_in_atomic_group_) {
break;
} else if (edit_list[k]->remaining_entries_ == 0) {
++k;
break;
}
++k;
}
for (auto i = group_start; i < batch_edits.size(); ++i) {
assert(static_cast<uint32_t>(k) <=
batch_edits.back()->remaining_entries_);
batch_edits[i]->remaining_entries_ -= static_cast<uint32_t>(k);
}
}
}
continue;
}
// We do a linear search on versions because versions is small.
@ -2888,6 +2918,15 @@ Status VersionSet::ProcessManifestWrites(
}
assert(builder != nullptr); // make checker happy
for (const auto& e : last_writer->edit_list) {
if (e->is_in_atomic_group_) {
if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
(batch_edits.back()->is_in_atomic_group_ &&
batch_edits.back()->remaining_entries_ == 0)) {
group_start = batch_edits.size();
}
} else if (group_start != std::numeric_limits<size_t>::max()) {
group_start = std::numeric_limits<size_t>::max();
}
LogAndApplyHelper(last_writer->cfd, builder, version, e, mu);
batch_edits.push_back(e);
}
@ -2900,6 +2939,42 @@ Status VersionSet::ProcessManifestWrites(
}
}
#ifndef NDEBUG
// Verify that version edits of atomic groups have correct
// remaining_entries_.
size_t k = 0;
while (k < batch_edits.size()) {
while (k < batch_edits.size() && !batch_edits[k]->is_in_atomic_group_) {
++k;
}
if (k == batch_edits.size()) {
break;
}
size_t i = k;
while (i < batch_edits.size()) {
if (!batch_edits[i]->is_in_atomic_group_) {
break;
}
assert(i - k + batch_edits[i]->remaining_entries_ ==
batch_edits[k]->remaining_entries_);
if (batch_edits[i]->remaining_entries_ == 0) {
++i;
break;
}
++i;
}
assert(batch_edits[i - 1]->is_in_atomic_group_);
assert(0 == batch_edits[i - 1]->remaining_entries_);
std::vector<VersionEdit*> tmp;
for (size_t j = k; j != i; ++j) {
tmp.emplace_back(batch_edits[j]);
}
TEST_SYNC_POINT_CALLBACK(
"VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp);
k = i;
}
#endif // NDEBUG
uint64_t new_manifest_file_size = 0;
Status s;
@ -3205,7 +3280,7 @@ Status VersionSet::LogAndApply(
if (!manifest_writers_.empty()) {
manifest_writers_.front()->cv.Signal();
}
return Status::OK();
return Status::ShutdownInProgress();
}
return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,

@ -605,9 +605,13 @@ TEST_F(FindLevelFileTest, LevelOverlappingFiles) {
ASSERT_TRUE(Overlaps("600", "700"));
}
class VersionSetTest : public testing::Test {
class VersionSetTestBase {
public:
VersionSetTest()
const static std::string kColumnFamilyName1;
const static std::string kColumnFamilyName2;
const static std::string kColumnFamilyName3;
VersionSetTestBase()
: env_(Env::Default()),
dbname_(test::PerThreadDBPath("version_set_test")),
db_options_(),
@ -635,8 +639,9 @@ class VersionSetTest : public testing::Test {
new_db.SetNextFile(2);
new_db.SetLastSequence(0);
const std::vector<std::string> cf_names = {kDefaultColumnFamilyName,
"alice", "bob"};
const std::vector<std::string> cf_names = {
kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
kColumnFamilyName3};
const int kInitialNumOfCfs = static_cast<int>(cf_names.size());
autovector<VersionEdit> new_cfs;
uint64_t last_seq = 1;
@ -711,6 +716,15 @@ class VersionSetTest : public testing::Test {
std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
};
const std::string VersionSetTestBase::kColumnFamilyName1 = "alice";
const std::string VersionSetTestBase::kColumnFamilyName2 = "bob";
const std::string VersionSetTestBase::kColumnFamilyName3 = "charles";
class VersionSetTest : public VersionSetTestBase, public testing::Test {
public:
VersionSetTest() : VersionSetTestBase() {}
};
TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {
NewDB();
const int kGroupSize = 5;
@ -958,6 +972,124 @@ TEST_F(VersionSetTest, HandleIncorrectAtomicGroupSize) {
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
EXPECT_TRUE(incorrect_group_size);
}
class VersionSetTestDropOneCF : public VersionSetTestBase,
public testing::TestWithParam<std::string> {
public:
VersionSetTestDropOneCF() : VersionSetTestBase() {}
};
// This test simulates the following execution sequence
// Time thread1 bg_flush_thr
// | Prepare version edits (e1,e2,e3) for atomic
// | flush cf1, cf2, cf3
// | Enqueue e to drop cfi
// | to manifest_writers_
// | Enqueue (e1,e2,e3) to manifest_writers_
// |
// | Apply e,
// | cfi.IsDropped() is true
// | Apply (e1,e2,e3),
// | since cfi.IsDropped() == true, we need to
// | drop ei and write the rest to MANIFEST.
// V
//
// Repeat the test for i = 1, 2, 3 to simulate dropping the first, middle and
// last column family in an atomic group.
TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) {
std::vector<ColumnFamilyDescriptor> column_families;
SequenceNumber last_seqno;
std::unique_ptr<log::Writer> log_writer;
PrepareManifest(&column_families, &last_seqno, &log_writer);
Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
ASSERT_OK(s);
EXPECT_OK(versions_->Recover(column_families, false /* read_only */));
EXPECT_EQ(column_families.size(),
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
const int kAtomicGroupSize = 3;
const std::vector<std::string> non_default_cf_names = {
kColumnFamilyName1, kColumnFamilyName2, kColumnFamilyName3};
// Drop one column family
VersionEdit drop_cf_edit;
drop_cf_edit.DropColumnFamily();
const std::string cf_to_drop_name(GetParam());
auto cfd_to_drop =
versions_->GetColumnFamilySet()->GetColumnFamily(cf_to_drop_name);
ASSERT_NE(nullptr, cfd_to_drop);
cfd_to_drop->Ref(); // Increase its refcount because cfd_to_drop is used later
drop_cf_edit.SetColumnFamily(cfd_to_drop->GetID());
mutex_.Lock();
s = versions_->LogAndApply(cfd_to_drop,
*cfd_to_drop->GetLatestMutableCFOptions(),
&drop_cf_edit, &mutex_);
mutex_.Unlock();
ASSERT_OK(s);
std::vector<VersionEdit> edits(kAtomicGroupSize);
uint32_t remaining = kAtomicGroupSize;
size_t i = 0;
autovector<ColumnFamilyData*> cfds;
autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<autovector<VersionEdit*>> edit_lists;
for (const auto& cf_name : non_default_cf_names) {
auto cfd = (cf_name != cf_to_drop_name)
? versions_->GetColumnFamilySet()->GetColumnFamily(cf_name)
: cfd_to_drop;
ASSERT_NE(nullptr, cfd);
cfds.push_back(cfd);
mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions());
edits[i].SetColumnFamily(cfd->GetID());
edits[i].SetLogNumber(0);
edits[i].SetNextFile(2);
edits[i].MarkAtomicGroup(--remaining);
edits[i].SetLastSequence(last_seqno++);
autovector<VersionEdit*> tmp_edits;
tmp_edits.push_back(&edits[i]);
edit_lists.emplace_back(tmp_edits);
++i;
}
int called = 0;
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", [&](void* arg) {
std::vector<VersionEdit*>* tmp_edits =
reinterpret_cast<std::vector<VersionEdit*>*>(arg);
EXPECT_EQ(kAtomicGroupSize - 1, tmp_edits->size());
for (const auto e : *tmp_edits) {
bool found = false;
for (const auto& e2 : edits) {
if (&e2 == e) {
found = true;
break;
}
}
ASSERT_TRUE(found);
}
++called;
});
SyncPoint::GetInstance()->EnableProcessing();
mutex_.Lock();
s = versions_->LogAndApply(cfds, mutable_cf_options_list, edit_lists,
&mutex_);
mutex_.Unlock();
ASSERT_OK(s);
ASSERT_EQ(1, called);
if (cfd_to_drop->Unref()) {
delete cfd_to_drop;
cfd_to_drop = nullptr;
}
}
INSTANTIATE_TEST_CASE_P(
AtomicGroup, VersionSetTestDropOneCF,
testing::Values(VersionSetTestBase::kColumnFamilyName1,
VersionSetTestBase::kColumnFamilyName2,
VersionSetTestBase::kColumnFamilyName3));
} // namespace rocksdb
int main(int argc, char** argv) {

Loading…
Cancel
Save