Refactor atomic flush result installation to MANIFEST (#4791)

Summary:
as titled.
Since different bg flush threads can flush different sets of column families
(due to column family creation and drop), we decide not to let one thread
perform atomic flush result installation for other threads. Bg flush threads
will install their atomic flush results sequentially to MANIFEST, using
a conditional variable, i.e. atomic_flush_install_cv_ to coordinate.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4791

Differential Revision: D13498930

Pulled By: riversand963

fbshipit-source-id: dd7482fc41f4bd22dad1e1ef7d4764ef424688d7
main
Yanqin Jin 6 years ago committed by Facebook Github Bot
parent 77a8d4d476
commit a07175af65
  1. 2
      db/db_impl.cc
  2. 19
      db/db_impl.h
  3. 64
      db/db_impl_compaction_flush.cc
  4. 9
      db/flush_job_test.cc
  5. 12
      db/memtable.h
  6. 323
      db/memtable_list.cc
  7. 51
      db/memtable_list.h
  8. 343
      db/memtable_list_test.cc
  9. 4
      db/version_set_test.cc

@ -220,7 +220,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
preserve_deletes_(options.preserve_deletes),
closed_(false),
error_handler_(this, immutable_db_options_, &mutex_),
atomic_flush_commit_in_progress_(false) {
atomic_flush_install_cv_(&mutex_) {
// !batch_per_trx_ implies seq_per_batch_ because it is only unset for
// WriteUnprepared, which should use seq_per_batch_.
assert(batch_per_txn_ || seq_per_batch_);

@ -1630,15 +1630,16 @@ class DBImpl : public DB {
ErrorHandler error_handler_;
// True if the DB is committing atomic flush.
// TODO (yanqin) the current impl assumes that the entire DB belongs to
// a single atomic flush group. In the future we need to add a new class
// (struct) similar to the following to make it more general.
// struct AtomicFlushGroup {
// bool commit_in_progress_;
// std::vector<MemTableList*> imm_lists;
// };
bool atomic_flush_commit_in_progress_;
// Conditional variable to coordinate installation of atomic flush results.
// With atomic flush, each bg thread installs the result of flushing multiple
// column families, and different threads can flush different column
// families. It's difficult to rely on one thread to perform batch
// installation for all threads. This is different from the non-atomic flush
// case.
// atomic_flush_install_cv_ makes sure that threads install atomic flush
// results sequentially. Flush results of memtables with lower IDs get
// installed to MANIFEST first.
InstrumentedCondVar atomic_flush_install_cv_;
};
extern Options SanitizeOptions(const std::string& db,

@ -404,35 +404,66 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
}
}
}
}
if (s.ok()) {
autovector<const autovector<MemTable*>*> mems_list;
for (int i = 0; i != num_cfs; ++i) {
auto wait_to_install_func = [&]() {
bool ready = true;
for (size_t i = 0; i != cfds.size(); ++i) {
const auto& mems = jobs[i].GetMemTables();
if (cfds[i]->IsDropped()) {
// If the column family is dropped, then do not wait.
continue;
} else if (!mems.empty() &&
cfds[i]->imm()->GetEarliestMemTableID() < mems[0]->GetID()) {
// If a flush job needs to install the flush result for mems and
// mems[0] is not the earliest memtable, it means another thread must
// be installing flush results for the same column family, then the
// current thread needs to wait.
ready = false;
break;
} else if (mems.empty() && cfds[i]->imm()->GetEarliestMemTableID() <=
bg_flush_args[i].max_memtable_id_) {
// If a flush job does not need to install flush results, then it has
// to wait until all memtables up to max_memtable_id_ (inclusive) are
// installed.
ready = false;
break;
}
const auto& mems = jobs[i].GetMemTables();
mems_list.emplace_back(&mems);
}
autovector<ColumnFamilyData*> all_cfds;
autovector<MemTableList*> imm_lists;
return ready;
};
bool resuming_from_bg_err = error_handler_.IsDBStopped();
while ((!error_handler_.IsDBStopped() ||
error_handler_.GetRecoveryError().ok()) &&
!wait_to_install_func()) {
atomic_flush_install_cv_.Wait();
}
s = resuming_from_bg_err ? error_handler_.GetRecoveryError()
: error_handler_.GetBGError();
}
if (s.ok()) {
autovector<ColumnFamilyData*> tmp_cfds;
autovector<const autovector<MemTable*>*> mems_list;
autovector<const MutableCFOptions*> mutable_cf_options_list;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
for (int i = 0; i != num_cfs; ++i) {
const auto& mems = jobs[i].GetMemTables();
if (!cfds[i]->IsDropped() && !mems.empty()) {
tmp_cfds.emplace_back(cfds[i]);
mems_list.emplace_back(&mems);
mutable_cf_options_list.emplace_back(
cfds[i]->GetLatestMutableCFOptions());
}
all_cfds.emplace_back(cfd);
imm_lists.emplace_back(cfd->imm());
mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions());
}
s = MemTableList::TryInstallMemtableFlushResults(
imm_lists, all_cfds, mutable_cf_options_list, mems_list,
&atomic_flush_commit_in_progress_, &logs_with_prep_tracker_,
s = InstallMemtableAtomicFlushResults(
nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
versions_.get(), &mutex_, file_meta, &job_context->memtables_to_free,
directories_.GetDbDir(), log_buffer);
}
}
if (s.ok() || s.IsShutdownInProgress()) {
assert(num_cfs ==
@ -2104,6 +2135,7 @@ void DBImpl::BackgroundCallFlush() {
bg_flush_scheduled_--;
// See if there's more work to be done
MaybeScheduleFlushOrCompaction();
atomic_flush_install_cv_.SignalAll();
bg_cv_.SignalAll();
// IMPORTANT: there should be no code after calling SignalAll. This call may
// signal the DB destructor that it's OK to proceed with destruction. In

@ -279,7 +279,6 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
*cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber);
mem->SetID(i);
mem->Ref();
mem->TEST_AtomicFlushSequenceNumber() = 123;
for (size_t j = 0; j != num_keys_per_memtable; ++j) {
std::string key(ToString(j + i * num_keys_per_memtable));
@ -325,17 +324,13 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
const auto& mems = flush_jobs[i].GetMemTables();
mems_list.push_back(&mems);
}
autovector<MemTableList*> imm_lists;
autovector<const MutableCFOptions*> mutable_cf_options_list;
for (auto cfd : all_cfds) {
imm_lists.push_back(cfd->imm());
mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
}
bool atomic_flush_commit_in_progress = false;
Status s = MemTableList::TryInstallMemtableFlushResults(
imm_lists, all_cfds, mutable_cf_options_list, mems_list,
&atomic_flush_commit_in_progress, nullptr /* logs_prep_tracker */,
Status s = InstallMemtableAtomicFlushResults(
nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
versions_.get(), &mutex_, file_metas, &job_context.memtables_to_free,
nullptr /* db_directory */, nullptr /* log_buffer */);
ASSERT_OK(s);

@ -386,13 +386,15 @@ class MemTable {
uint64_t GetID() const { return id_; }
SequenceNumber& TEST_AtomicFlushSequenceNumber() {
return atomic_flush_seqno_;
}
void SetFlushCompleted(bool completed) { flush_completed_ = completed; }
uint64_t GetFileNumber() const { return file_number_; }
void TEST_SetFlushCompleted(bool completed) { flush_completed_ = completed; }
void SetFileNumber(uint64_t file_num) { file_number_ = file_num; }
void TEST_SetFileNumber(uint64_t file_num) { file_number_ = file_num; }
void SetFlushInProgress(bool in_progress) {
flush_in_progress_ = in_progress;
}
private:
enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };

@ -260,228 +260,6 @@ void MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete) {
}
}
// Try to record multiple successful flush to the MANIFEST as an atomic unit.
// This function may just return Status::OK if there has already been
// a concurrent thread performing actual recording.
Status MemTableList::TryInstallMemtableFlushResults(
autovector<MemTableList*>& imm_lists,
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list,
bool* atomic_flush_commit_in_progress, LogsWithPrepTracker* prep_tracker,
VersionSet* vset, InstrumentedMutex* mu,
const autovector<FileMetaData>& file_metas,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
mu->AssertHeld();
for (size_t k = 0; k != mems_list.size(); ++k) {
for (size_t i = 0; i != mems_list[k]->size(); ++i) {
assert(i == 0 || (*mems_list[k])[i]->GetEdits()->NumEntries() == 0);
(*mems_list[k])[i]->flush_completed_ = true;
(*mems_list[k])[i]->file_number_ = file_metas[k].fd.GetNumber();
}
}
assert(atomic_flush_commit_in_progress != nullptr);
Status s;
if (*atomic_flush_commit_in_progress) {
// If the function reaches here, there must be a concurrent thread that
// have already started recording to MANIFEST. Therefore we should just
// return Status::OK and let the othe thread finish writing to MANIFEST on
// our behalf.
return s;
}
// If the function reaches here, the current thread will start writing to
// MANIFEST. It may record to MANIFEST the flush results of other flushes.
*atomic_flush_commit_in_progress = true;
auto comp = [&imm_lists](size_t lh, size_t rh) {
const auto& memlist1 = imm_lists[lh]->current_->memlist_;
const auto& memlist2 = imm_lists[rh]->current_->memlist_;
auto it1 = memlist1.rbegin();
auto it2 = memlist2.rbegin();
return (*it1)->atomic_flush_seqno_ > (*it2)->atomic_flush_seqno_;
};
// The top of the heap is the memtable with smallest atomic_flush_seqno_.
std::priority_queue<size_t, std::vector<size_t>, decltype(comp)> heap(comp);
// Sequence number of the oldest unfinished atomic flush.
SequenceNumber min_unfinished_seqno = kMaxSequenceNumber;
// Populate the heap with first element of each imm iff. it has been
// flushed to storage, i.e. flush_completed_ is true.
size_t num = imm_lists.size();
assert(num == cfds.size());
for (size_t i = 0; i != num; ++i) {
std::list<MemTable*>& memlist = imm_lists[i]->current_->memlist_;
if (memlist.empty()) {
continue;
}
auto it = memlist.rbegin();
if ((*it)->flush_completed_) {
heap.emplace(i);
} else if (min_unfinished_seqno > (*it)->atomic_flush_seqno_) {
min_unfinished_seqno = (*it)->atomic_flush_seqno_;
}
}
while (s.ok() && !heap.empty()) {
autovector<size_t> batch;
SequenceNumber seqno = kMaxSequenceNumber;
// Pop from the heap the memtables that belong to the same atomic flush,
// namely their atomic_flush_seqno_ are equal.
do {
size_t pos = heap.top();
const auto& memlist = imm_lists[pos]->current_->memlist_;
MemTable* mem = *(memlist.rbegin());
if (seqno == kMaxSequenceNumber) {
// First mem in this batch.
seqno = mem->atomic_flush_seqno_;
batch.emplace_back(pos);
heap.pop();
} else if (mem->atomic_flush_seqno_ == seqno) {
// mem has the same atomic_flush_seqno_, thus in the same atomic flush.
batch.emplace_back(pos);
heap.pop();
} else if (mem->atomic_flush_seqno_ > seqno) {
// mem belongs to another atomic flush with higher seqno, break the
// loop.
break;
}
} while (!heap.empty());
if (seqno >= min_unfinished_seqno) {
// If there is an older, unfinished atomic flush, then we should not
// proceed.
TEST_SYNC_POINT_CALLBACK(
"MemTableList::TryInstallMemtableFlushResults:"
"HasOlderUnfinishedAtomicFlush:0",
nullptr);
break;
}
// Found the earliest, complete atomic flush. No earlier atomic flush is
// pending. Therefore ready to record it to the MANIFEST.
uint32_t num_entries = 0;
autovector<ColumnFamilyData*> tmp_cfds;
autovector<const MutableCFOptions*> tmp_mutable_cf_options_list;
std::vector<autovector<MemTable*>> memtables_to_flush;
autovector<autovector<VersionEdit*>> edit_lists;
for (auto pos : batch) {
tmp_cfds.emplace_back(cfds[pos]);
tmp_mutable_cf_options_list.emplace_back(mutable_cf_options_list[pos]);
const auto& memlist = imm_lists[pos]->current_->memlist_;
uint64_t batch_file_number = 0;
autovector<MemTable*> tmp_mems;
autovector<VersionEdit*> edits;
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
if (!m->flush_completed_ ||
(it != memlist.rbegin() && m->file_number_ != batch_file_number)) {
break;
}
if (it == memlist.rbegin()) {
batch_file_number = m->file_number_;
edits.push_back(m->GetEdits());
++num_entries;
}
tmp_mems.push_back(m);
}
edit_lists.push_back(edits);
memtables_to_flush.push_back(tmp_mems);
}
TEST_SYNC_POINT_CALLBACK(
"MemTableList::TryInstallMemtableFlushResults:FoundBatchToCommit:0",
&num_entries);
// Mark the version edits as an atomic group
uint32_t remaining = num_entries;
for (auto& edit_list : edit_lists) {
assert(edit_list.size() == 1);
edit_list[0]->MarkAtomicGroup(--remaining);
}
assert(remaining == 0);
size_t batch_sz = batch.size();
assert(batch_sz > 0);
assert(batch_sz == memtables_to_flush.size());
assert(batch_sz == tmp_cfds.size());
assert(batch_sz == edit_lists.size());
if (vset->db_options()->allow_2pc) {
for (size_t i = 0; i != batch_sz; ++i) {
auto& edit_list = edit_lists[i];
assert(!edit_list.empty());
edit_list.back()->SetMinLogNumberToKeep(
PrecomputeMinLogNumberToKeep(vset, *tmp_cfds[i], edit_list,
memtables_to_flush[i], prep_tracker));
}
}
// this can release and reacquire the mutex.
s = vset->LogAndApply(tmp_cfds, tmp_mutable_cf_options_list, edit_lists, mu,
db_directory);
for (const auto pos : batch) {
imm_lists[pos]->InstallNewVersion();
}
if (s.ok() || s.IsShutdownInProgress()) {
for (size_t i = 0; i != batch_sz; ++i) {
if (tmp_cfds[i]->IsDropped()) {
continue;
}
size_t pos = batch[i];
for (auto m : memtables_to_flush[i]) {
assert(m->file_number_ > 0);
uint64_t mem_id = m->GetID();
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
tmp_cfds[i]->GetName().c_str(), m->file_number_,
mem_id);
imm_lists[pos]->current_->Remove(m, to_delete);
}
}
} else {
for (size_t i = 0; i != batch_sz; ++i) {
size_t pos = batch[i];
for (auto m : memtables_to_flush[i]) {
uint64_t mem_id = m->GetID();
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed",
tmp_cfds[i]->GetName().c_str(), m->file_number_,
mem_id);
m->flush_completed_ = false;
m->flush_in_progress_ = false;
m->edit_.Clear();
m->file_number_ = 0;
imm_lists[pos]->num_flush_not_started_++;
}
imm_lists[pos]->imm_flush_needed.store(true, std::memory_order_release);
}
}
// Adjust the heap AFTER installing new MemTableListVersions because the
// compare function 'comp' needs to capture the most up-to-date state of
// imm_lists.
for (auto pos : batch) {
const auto& memlist = imm_lists[pos]->current_->memlist_;
if (!memlist.empty()) {
MemTable* mem = *(memlist.rbegin());
if (mem->flush_completed_) {
heap.emplace(pos);
} else if (min_unfinished_seqno > mem->atomic_flush_seqno_) {
min_unfinished_seqno = mem->atomic_flush_seqno_;
}
}
}
}
*atomic_flush_commit_in_progress = false;
return s;
}
// Returns true if there is at least one memtable on which flush has
// not yet started.
bool MemTableList::IsFlushPending() const {
@ -749,4 +527,105 @@ uint64_t MemTableList::PrecomputeMinLogContainingPrepSection(
return min_log;
}
// Commit a successful atomic flush in the manifest file.
Status InstallMemtableAtomicFlushResults(
const autovector<MemTableList*>* imm_lists,
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
InstrumentedMutex* mu, const autovector<FileMetaData>& file_metas,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
mu->AssertHeld();
size_t num = mems_list.size();
assert(cfds.size() == num);
if (imm_lists != nullptr) {
assert(imm_lists->size() == num);
}
for (size_t k = 0; k != num; ++k) {
#ifndef NDEBUG
const auto* imm =
(imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k);
if (!mems_list[k]->empty()) {
assert((*mems_list[k])[0]->GetID() == imm->GetEarliestMemTableID());
}
#endif
for (size_t i = 0; i != mems_list[k]->size(); ++i) {
assert(i == 0 || (*mems_list[k])[i]->GetEdits()->NumEntries() == 0);
(*mems_list[k])[i]->SetFlushCompleted(true);
(*mems_list[k])[i]->SetFileNumber(file_metas[k].fd.GetNumber());
}
}
Status s;
autovector<autovector<VersionEdit*>> edit_lists;
uint32_t num_entries = 0;
for (const auto mems : mems_list) {
assert(mems != nullptr);
autovector<VersionEdit*> edits;
assert(!mems->empty());
edits.emplace_back((*mems)[0]->GetEdits());
++num_entries;
edit_lists.emplace_back(edits);
}
// Mark the version edits as an atomic group
for (auto& edits : edit_lists) {
assert(edits.size() == 1);
edits[0]->MarkAtomicGroup(--num_entries);
}
assert(0 == num_entries);
// this can release and reacquire the mutex.
s = vset->LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu,
db_directory);
for (size_t k = 0; k != cfds.size(); ++k) {
auto* imm = (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k);
imm->InstallNewVersion();
}
if (s.ok() || s.IsShutdownInProgress()) {
for (size_t i = 0; i != cfds.size(); ++i) {
if (cfds[i]->IsDropped()) {
continue;
}
auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i);
for (auto m : *mems_list[i]) {
assert(m->GetFileNumber() > 0);
uint64_t mem_id = m->GetID();
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
cfds[i]->GetName().c_str(), m->GetFileNumber(),
mem_id);
imm->current_->Remove(m, to_delete);
}
}
} else {
for (size_t i = 0; i != cfds.size(); ++i) {
auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i);
for (auto m : *mems_list[i]) {
uint64_t mem_id = m->GetID();
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed",
cfds[i]->GetName().c_str(), m->GetFileNumber(),
mem_id);
m->SetFlushCompleted(false);
m->SetFlushInProgress(false);
m->GetEdits()->Clear();
m->SetFileNumber(0);
imm->num_flush_not_started_++;
}
imm->imm_flush_needed.store(true, std::memory_order_release);
}
}
return s;
}
} // namespace rocksdb

@ -31,6 +31,7 @@ class ColumnFamilyData;
class InternalKeyComparator;
class InstrumentedMutex;
class MergeIteratorBuilder;
class MemTableList;
// keeps a list of immutable memtables in a vector. the list is immutable
// if refcount is bigger than one. It is used as a state for Get() and
@ -114,6 +115,18 @@ class MemTableListVersion {
SequenceNumber GetEarliestSequenceNumber(bool include_history = false) const;
private:
friend class MemTableList;
friend Status InstallMemtableAtomicFlushResults(
const autovector<MemTableList*>* imm_lists,
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list,
VersionSet* vset, InstrumentedMutex* mu,
const autovector<FileMetaData>& file_meta,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer);
// REQUIRE: m is an immutable memtable
void Add(MemTable* m, autovector<MemTable*>* to_delete);
// REQUIRE: m is an immutable memtable
@ -132,8 +145,6 @@ class MemTableListVersion {
void UnrefMemTable(autovector<MemTable*>* to_delete, MemTable* m);
friend class MemTableList;
// Immutable MemTables that have not yet been flushed.
std::list<MemTable*> memlist_;
@ -163,18 +174,6 @@ class MemTableListVersion {
// write thread.)
class MemTableList {
public:
// Commit a successful atomic flush in the manifest file
static Status TryInstallMemtableFlushResults(
autovector<MemTableList*>& imm_lists,
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list,
bool* atomic_flush_commit_in_progress, LogsWithPrepTracker* prep_tracker,
VersionSet* vset, InstrumentedMutex* mu,
const autovector<FileMetaData>& file_meta,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer);
// A list of memtables.
explicit MemTableList(int min_write_buffer_number_to_merge,
int max_write_buffer_number_to_maintain)
@ -296,6 +295,16 @@ class MemTableList {
}
private:
friend Status InstallMemtableAtomicFlushResults(
const autovector<MemTableList*>* imm_lists,
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list,
VersionSet* vset, InstrumentedMutex* mu,
const autovector<FileMetaData>& file_meta,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer);
// DB mutex held
void InstallNewVersion();
@ -317,4 +326,18 @@ class MemTableList {
size_t current_memory_usage_;
};
// Installs memtable atomic flush results.
// In most cases, imm_lists is nullptr, and the function simply uses the
// immutable memtable lists associated with the cfds. There are unit tests that
// installs flush results for external immutable memtable lists other than the
// cfds' own immutable memtable lists, e.g. MemTableLIstTest. In this case,
// imm_lists parameter is not nullptr.
extern Status InstallMemtableAtomicFlushResults(
const autovector<MemTableList*>* imm_lists,
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
InstrumentedMutex* mu, const autovector<FileMetaData>& file_meta,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer);
} // namespace rocksdb

@ -85,17 +85,46 @@ class MemTableListTest : public testing::Test {
Status Mock_InstallMemtableFlushResults(
MemTableList* list, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& m, autovector<MemTable*>* to_delete) {
autovector<MemTableList*> lists;
lists.emplace_back(list);
autovector<const autovector<MemTable*>*> mems_list;
mems_list.emplace_back(&m);
return Mock_InstallMemtableFlushResults(
lists, {0} /* cf_ids */, {&mutable_cf_options}, mems_list, to_delete);
// Create a mock Logger
test::NullLogger logger;
LogBuffer log_buffer(DEBUG_LEVEL, &logger);
CreateDB();
// Create a mock VersionSet
DBOptions db_options;
ImmutableDBOptions immutable_db_options(db_options);
EnvOptions env_options;
std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
WriteController write_controller(10000000u);
VersionSet versions(dbname, &immutable_db_options, env_options,
table_cache.get(), &write_buffer_manager,
&write_controller);
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back("one", ColumnFamilyOptions());
cf_descs.emplace_back("two", ColumnFamilyOptions());
EXPECT_OK(versions.Recover(cf_descs, false));
// Create mock default ColumnFamilyData
auto column_family_set = versions.GetColumnFamilySet();
LogsWithPrepTracker dummy_prep_tracker;
auto cfd = column_family_set->GetDefault();
EXPECT_TRUE(nullptr != cfd);
uint64_t file_num = file_number.fetch_add(1);
// Create dummy mutex.
InstrumentedMutex mutex;
InstrumentedMutexLock l(&mutex);
return list->TryInstallMemtableFlushResults(
cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex,
file_num, to_delete, nullptr, &log_buffer);
}
// Calls MemTableList::InstallMemtableFlushResults() and sets up all
// structures needed to call this function.
Status Mock_InstallMemtableFlushResults(
Status Mock_InstallMemtableAtomicFlushResults(
autovector<MemTableList*>& lists, const autovector<uint32_t>& cf_ids,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list,
@ -127,25 +156,6 @@ class MemTableListTest : public testing::Test {
auto column_family_set = versions.GetColumnFamilySet();
LogsWithPrepTracker dummy_prep_tracker;
if (1 == cf_ids.size()) {
auto cfd = column_family_set->GetColumnFamily(cf_ids[0]);
EXPECT_TRUE(nullptr != cfd);
EXPECT_EQ(1, lists.size());
MemTableList* list = lists[0];
EXPECT_EQ(1, mutable_cf_options_list.size());
const MutableCFOptions& mutable_cf_options =
*(mutable_cf_options_list.at(0));
const autovector<MemTable*>* mems = mems_list.at(0);
EXPECT_TRUE(nullptr != mems);
uint64_t file_num = file_number.fetch_add(1);
// Create dummy mutex.
InstrumentedMutex mutex;
InstrumentedMutexLock l(&mutex);
return list->TryInstallMemtableFlushResults(
cfd, mutable_cf_options, *mems, &dummy_prep_tracker, &versions,
&mutex, file_num, to_delete, nullptr, &log_buffer);
}
autovector<ColumnFamilyData*> cfds;
for (int i = 0; i != static_cast<int>(cf_ids.size()); ++i) {
cfds.emplace_back(column_family_set->GetColumnFamily(cf_ids[i]));
@ -158,13 +168,11 @@ class MemTableListTest : public testing::Test {
meta.fd = FileDescriptor(file_num, 0, 0);
file_metas.emplace_back(meta);
}
bool atomic_flush_commit_in_progress = false;
InstrumentedMutex mutex;
InstrumentedMutexLock l(&mutex);
return MemTableList::TryInstallMemtableFlushResults(
lists, cfds, mutable_cf_options_list, mems_list,
&atomic_flush_commit_in_progress, &dummy_prep_tracker, &versions,
&mutex, file_metas, to_delete, nullptr, &log_buffer);
return InstallMemtableAtomicFlushResults(
&lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
file_metas, to_delete, nullptr, &log_buffer);
}
};
@ -730,18 +738,28 @@ TEST_F(MemTableListTest, FlushPendingTest) {
to_delete.clear();
}
TEST_F(MemTableListTest, FlushMultipleCFsTest) {
TEST_F(MemTableListTest, EmptyAtomicFlusTest) {
autovector<MemTableList*> lists;
autovector<uint32_t> cf_ids;
autovector<const MutableCFOptions*> options_list;
autovector<const autovector<MemTable*>*> to_flush;
autovector<MemTable*> to_delete;
Status s = Mock_InstallMemtableAtomicFlushResults(lists, cf_ids, options_list,
to_flush, &to_delete);
ASSERT_OK(s);
ASSERT_TRUE(to_delete.empty());
}
TEST_F(MemTableListTest, AtomicFlusTest) {
const int num_cfs = 3;
const int num_tables_per_cf = 5;
const int num_tables_per_cf = 2;
SequenceNumber seq = 1;
Status s;
auto factory = std::make_shared<SkipListFactory>();
options.memtable_factory = factory;
ImmutableCFOptions ioptions(options);
InternalKeyComparator cmp(BytewiseComparator());
WriteBufferManager wb(options.db_write_buffer_size);
autovector<MemTable*> to_delete;
// Create MemTableLists
int min_write_buffer_number_to_merge = 3;
@ -782,135 +800,72 @@ TEST_F(MemTableListTest, FlushMultipleCFsTest) {
std::vector<autovector<MemTable*>> flush_candidates(num_cfs);
// Nothing to flush
for (int i = 0; i != num_cfs; ++i) {
auto list = lists[i];
for (auto i = 0; i != num_cfs; ++i) {
auto* list = lists[i];
ASSERT_FALSE(list->IsFlushPending());
ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
list->PickMemtablesToFlush(nullptr /* memtable_id */, &flush_candidates[i]);
ASSERT_EQ(0, static_cast<int>(flush_candidates[i].size()));
ASSERT_EQ(0, flush_candidates[i].size());
}
// Request flush even though there is nothing to flush
for (int i = 0; i != num_cfs; ++i) {
auto list = lists[i];
for (auto i = 0; i != num_cfs; ++i) {
auto* list = lists[i];
list->FlushRequested();
ASSERT_FALSE(list->IsFlushPending());
ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
}
// Add tables to column families
for (int i = 0; i != num_cfs; ++i) {
for (int j = 0; j != num_tables_per_cf; ++j) {
autovector<MemTable*> to_delete;
// Add tables to the immutable memtalbe lists associated with column families
for (auto i = 0; i != num_cfs; ++i) {
for (auto j = 0; j != num_tables_per_cf; ++j) {
lists[i]->Add(tables[i][j], &to_delete);
}
ASSERT_EQ(num_tables_per_cf, lists[i]->NumNotFlushed());
ASSERT_TRUE(lists[i]->IsFlushPending());
ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire));
}
autovector<const autovector<MemTable*>*> to_flush;
std::vector<uint64_t> prev_memtable_ids;
// For each column family, determine the memtables to flush
for (int k = 0; k != 4; ++k) {
std::vector<uint64_t> flush_memtable_ids;
if (0 == k) {
std::vector<uint64_t> flush_memtable_ids = {1, 1, 0};
// +----+
// list[0]: |0 1| 2 3 4
// list[1]: |0 1| 2 3 4
// list[0]: |0 1|
// list[1]: |0 1|
// | +--+
// list[2]: |0| 1 2 3 4
// +-+
flush_memtable_ids = {1, 1, 0};
} else if (1 == k) {
// +----+ +---+
// list[0]: |0 1| |2 3| 4
// list[1]: |0 1| |2 3| 4
// | +--+ +---+
// list[2]: |0| 1 2 3 4
// list[2]: |0| 1
// +-+
flush_memtable_ids = {3, 3, 0};
} else if (2 == k) {
// +-----+ +---+
// list[0]: |0 1| |2 3| 4
// list[1]: |0 1| |2 3| 4
// | +---+ +---+
// | | +-------+
// list[2]: |0| |1 2 3| 4
// +-+ +-------+
flush_memtable_ids = {3, 3, 3};
} else {
// +-----+ +---+ +-+
// list[0]: |0 1| |2 3| |4|
// list[1]: |0 1| |2 3| |4|
// | +---+ +---+ | |
// | | +-------+ | |
// list[2]: |0| |1 2 3| |4|
// +-+ +-------+ +-+
flush_memtable_ids = {4, 4, 4};
}
assert(num_cfs == static_cast<int>(flush_memtable_ids.size()));
// Pick memtables to flush
for (int i = 0; i != num_cfs; ++i) {
for (auto i = 0; i != num_cfs; ++i) {
flush_candidates[i].clear();
lists[i]->PickMemtablesToFlush(&flush_memtable_ids[i],
&flush_candidates[i]);
for (auto mem : flush_candidates[i]) {
mem->TEST_AtomicFlushSequenceNumber() = SequenceNumber(k);
}
if (prev_memtable_ids.empty()) {
ASSERT_EQ(flush_memtable_ids[i] - 0 + 1, flush_candidates[i].size());
} else {
ASSERT_EQ(flush_memtable_ids[i] - prev_memtable_ids[i],
flush_candidates[i].size());
}
ASSERT_EQ(num_tables_per_cf, lists[i]->NumNotFlushed());
ASSERT_FALSE(lists[i]->HasFlushRequested());
if (flush_memtable_ids[i] == num_tables_per_cf - 1) {
ASSERT_FALSE(
lists[i]->imm_flush_needed.load(std::memory_order_acquire));
} else {
ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire));
ASSERT_EQ(flush_memtable_ids[i] - 0 + 1,
static_cast<uint64_t>(flush_candidates[i].size()));
}
autovector<MemTableList*> tmp_lists;
autovector<uint32_t> tmp_cf_ids;
autovector<const MutableCFOptions*> tmp_options_list;
autovector<const autovector<MemTable*>*> to_flush;
for (auto i = 0; i != num_cfs; ++i) {
if (!flush_candidates[i].empty()) {
to_flush.push_back(&flush_candidates[i]);
tmp_lists.push_back(lists[i]);
tmp_cf_ids.push_back(i);
tmp_options_list.push_back(mutable_cf_options_list[i]);
}
prev_memtable_ids = flush_memtable_ids;
if (k < 3) {
for (const auto& mems : flush_candidates) {
uint64_t file_num = file_number.fetch_add(1);
for (auto m : mems) {
m->TEST_SetFlushCompleted(true);
m->TEST_SetFileNumber(file_num);
}
}
}
if (k == 0) {
// Rollback first pick of tables
for (int i = 0; i != num_cfs; ++i) {
auto list = lists[i];
const auto& mems = flush_candidates[i];
for (auto m : mems) {
m->TEST_SetFileNumber(0);
}
list->RollbackMemtableFlush(flush_candidates[i], 0);
ASSERT_TRUE(list->IsFlushPending());
ASSERT_TRUE(list->imm_flush_needed.load(std::memory_order_acquire));
}
prev_memtable_ids.clear();
}
Status s = Mock_InstallMemtableAtomicFlushResults(
tmp_lists, tmp_cf_ids, tmp_options_list, to_flush, &to_delete);
ASSERT_OK(s);
if (k == 3) {
for (int i = 0; i != num_cfs; ++i) {
to_flush.emplace_back(&flush_candidates[i]);
for (auto i = 0; i != num_cfs; ++i) {
for (auto j = 0; j != num_tables_per_cf; ++j) {
if (static_cast<uint64_t>(j) <= flush_memtable_ids[i]) {
ASSERT_LT(0, tables[i][j]->GetFileNumber());
}
}
ASSERT_EQ(
static_cast<size_t>(num_tables_per_cf) - flush_candidates[i].size(),
lists[i]->NumNotFlushed());
}
s = Mock_InstallMemtableFlushResults(lists, cf_ids, mutable_cf_options_list,
to_flush, &to_delete);
ASSERT_OK(s);
to_delete.clear();
for (auto list : lists) {
list->current()->Unref(&to_delete);
@ -932,126 +887,6 @@ TEST_F(MemTableListTest, FlushMultipleCFsTest) {
ASSERT_EQ(m, m->Unref());
delete m;
}
to_delete.clear();
}
TEST_F(MemTableListTest, HasOlderAtomicFlush) {
const size_t num_cfs = 3;
const size_t num_memtables_per_cf = 2;
SequenceNumber seq = 1;
Status s;
auto factory = std::make_shared<SkipListFactory>();
options.memtable_factory = factory;
ImmutableCFOptions ioptions(options);
InternalKeyComparator cmp(BytewiseComparator());
WriteBufferManager wb(options.db_write_buffer_size);
autovector<MemTable*> to_delete;
// Create MemTableLists
int min_write_buffer_number_to_merge = 3;
int max_write_buffer_number_to_maintain = 7;
autovector<MemTableList*> lists;
for (size_t i = 0; i != num_cfs; ++i) {
lists.emplace_back(new MemTableList(min_write_buffer_number_to_merge,
max_write_buffer_number_to_maintain));
}
autovector<uint32_t> cf_ids;
std::vector<std::vector<MemTable*>> tables;
autovector<const MutableCFOptions*> mutable_cf_options_list;
uint32_t cf_id = 0;
for (size_t k = 0; k != num_cfs; ++k) {
std::vector<MemTable*> elem;
mutable_cf_options_list.emplace_back(new MutableCFOptions(options));
uint64_t memtable_id = 0;
for (int i = 0; i != num_memtables_per_cf; ++i) {
MemTable* mem =
new MemTable(cmp, ioptions, *(mutable_cf_options_list.back()), &wb,
kMaxSequenceNumber, cf_id);
mem->SetID(memtable_id++);
mem->Ref();
std::string value;
mem->Add(++seq, kTypeValue, "key1", ToString(i));
mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
elem.push_back(mem);
}
tables.emplace_back(elem);
cf_ids.push_back(cf_id++);
}
// Add tables to column families' immutable memtable lists
for (size_t i = 0; i != num_cfs; ++i) {
for (size_t j = 0; j != num_memtables_per_cf; ++j) {
lists[i]->Add(tables[i][j], &to_delete);
}
lists[i]->FlushRequested();
ASSERT_EQ(num_memtables_per_cf, lists[i]->NumNotFlushed());
ASSERT_TRUE(lists[i]->IsFlushPending());
ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire));
}
std::vector<autovector<MemTable*>> flush_candidates(num_cfs);
for (size_t i = 0; i != num_cfs; ++i) {
lists[i]->PickMemtablesToFlush(nullptr, &flush_candidates[i]);
for (auto m : flush_candidates[i]) {
m->TEST_AtomicFlushSequenceNumber() = 123;
}
lists[i]->RollbackMemtableFlush(flush_candidates[i], 0);
}
uint64_t memtable_id = num_memtables_per_cf - 1;
autovector<MemTable*> other_flush_candidates;
lists[0]->PickMemtablesToFlush(&memtable_id, &other_flush_candidates);
for (auto m : other_flush_candidates) {
m->TEST_AtomicFlushSequenceNumber() = 124;
m->TEST_SetFlushCompleted(true);
m->TEST_SetFileNumber(1);
}
autovector<const autovector<MemTable*>*> to_flush;
to_flush.emplace_back(&other_flush_candidates);
bool has_older_unfinished_atomic_flush = false;
bool found_batch_to_commit = false;
SyncPoint::GetInstance()->SetCallBack(
"MemTableList::TryInstallMemtableFlushResults:"
"HasOlderUnfinishedAtomicFlush:0",
[&](void* /*arg*/) { has_older_unfinished_atomic_flush = true; });
SyncPoint::GetInstance()->SetCallBack(
"MemTableList::TryInstallMemtableFlushResults:FoundBatchToCommit:0",
[&](void* /*arg*/) { found_batch_to_commit = true; });
SyncPoint::GetInstance()->EnableProcessing();
s = Mock_InstallMemtableFlushResults(lists, cf_ids, mutable_cf_options_list,
to_flush, &to_delete);
ASSERT_OK(s);
ASSERT_TRUE(has_older_unfinished_atomic_flush);
ASSERT_FALSE(found_batch_to_commit);
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_TRUE(to_delete.empty());
for (auto list : lists) {
list->current()->Unref(&to_delete);
delete list;
}
lists.clear();
ASSERT_EQ(num_cfs * num_memtables_per_cf, to_delete.size());
for (auto m : to_delete) {
m->Ref();
ASSERT_EQ(m, m->Unref());
delete m;
}
to_delete.clear();
for (auto& opts : mutable_cf_options_list) {
delete opts;
opts = nullptr;
}
mutable_cf_options_list.clear();
}
} // namespace rocksdb

@ -1019,7 +1019,9 @@ TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) {
auto cfd_to_drop =
versions_->GetColumnFamilySet()->GetColumnFamily(cf_to_drop_name);
ASSERT_NE(nullptr, cfd_to_drop);
cfd_to_drop->Ref(); // Increase its refcount because cfd_to_drop is used later
// Increase its refcount because cfd_to_drop is used later, and we need to
// prevent it from being deleted.
cfd_to_drop->Ref();
drop_cf_edit.SetColumnFamily(cfd_to_drop->GetID());
mutex_.Lock();
s = versions_->LogAndApply(cfd_to_drop,

Loading…
Cancel
Save