Retire superfluous functions introduced in earlier mempurge PRs. (#8558)

Summary:
The main challenge to make the memtable garbage collection prototype (nicknamed `mempurge`) was to not get rid of WAL files that contain unflushed (but mempurged) data. That was successfully guaranteed by not writing the VersionEdit to the MANIFEST file after a successful mempurge.
By not writing VersionEdits to the `MANIFEST` file after a succesful mempurge operation, we do not change the earliest log file number that contains unflushed data: `cfd->GetLogNumber()` (`cfd->SetLogNumber()` is only called in `VersionSet::ProcessManifestWrites`). As a result, a number of functions introduced earlier just for the mempurge operation are not obscolete/redundant. (e.g.: `FlushJob::ExtractEarliestLogFileNumber`), and this PR aims at cleaning up all these now-unnecessary functions. In particular, we no longer need to store the earliest log file number in the `MemTable` struct itself. This PR therefore also reverts the `MemTable` struct to its original form.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8558

Test Plan: Already included in `db_flush_test.cc`.

Reviewed By: anand1976

Differential Revision: D29764351

Pulled By: bjlemaire

fbshipit-source-id: 0f43b260fa270251862512f397d3f24ee62e8437
main
Baptiste Lemaire 4 years ago committed by Facebook GitHub Bot
parent 61c9bd49c1
commit c521a9ab2b
  1. 11
      db/column_family.cc
  2. 5
      db/column_family.h
  3. 7
      db/db_impl/db_impl_open.cc
  4. 4
      db/db_impl/db_impl_secondary.cc
  5. 3
      db/db_impl/db_impl_write.cc
  6. 15
      db/flush_job.cc
  7. 1
      db/flush_job.h
  8. 4
      db/memtable.cc
  9. 17
      db/memtable.h
  10. 22
      db/memtable_list.cc
  11. 4
      db/memtable_list.h
  12. 2
      db/repair.cc
  13. 5
      db/version_edit_handler.cc
  14. 2
      db/version_set.cc
  15. 18
      db/version_set.h

@ -1059,20 +1059,17 @@ uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
} }
MemTable* ColumnFamilyData::ConstructNewMemtable( MemTable* ColumnFamilyData::ConstructNewMemtable(
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq, const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
uint64_t log_number) {
return new MemTable(internal_comparator_, ioptions_, mutable_cf_options, return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
write_buffer_manager_, earliest_seq, id_, log_number); write_buffer_manager_, earliest_seq, id_);
} }
void ColumnFamilyData::CreateNewMemtable( void ColumnFamilyData::CreateNewMemtable(
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq, const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
uint64_t log_number) {
if (mem_ != nullptr) { if (mem_ != nullptr) {
delete mem_->Unref(); delete mem_->Unref();
} }
SetMemtable( SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
ConstructNewMemtable(mutable_cf_options, earliest_seq, log_number));
mem_->Ref(); mem_->Ref();
} }

@ -371,10 +371,9 @@ class ColumnFamilyData {
// See Memtable constructor for explanation of earliest_seq param. // See Memtable constructor for explanation of earliest_seq param.
MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options, MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq, SequenceNumber earliest_seq);
uint64_t log_number = 0);
void CreateNewMemtable(const MutableCFOptions& mutable_cf_options, void CreateNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq, uint64_t log_number = 0); SequenceNumber earliest_seq);
TableCache* table_cache() const { return table_cache_.get(); } TableCache* table_cache() const { return table_cache_.get(); }
BlobFileCache* blob_file_cache() const { return blob_file_cache_.get(); } BlobFileCache* blob_file_cache() const { return blob_file_cache_.get(); }

@ -630,7 +630,7 @@ Status DBImpl::Recover(
// Clear memtables if recovery failed // Clear memtables if recovery failed
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
kMaxSequenceNumber, cfd->GetLogNumber()); kMaxSequenceNumber);
} }
} }
} }
@ -1066,7 +1066,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
flushed = true; flushed = true;
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
*next_sequence, cfd->GetLogNumber()); *next_sequence);
} }
} }
} }
@ -1204,8 +1204,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
flushed = true; flushed = true;
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
versions_->LastSequence(), versions_->LastSequence());
cfd->GetLogNumber());
} }
data_seen = true; data_seen = true;
} }

@ -256,8 +256,8 @@ Status DBImplSecondary::RecoverLogFiles(
curr_log_num != log_number)) { curr_log_num != log_number)) {
const MutableCFOptions mutable_cf_options = const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions(); *cfd->GetLatestMutableCFOptions();
MemTable* new_mem = cfd->ConstructNewMemtable( MemTable* new_mem =
mutable_cf_options, seq_of_batch, log_number); cfd->ConstructNewMemtable(mutable_cf_options, seq_of_batch);
cfd->mem()->SetNextLogNumber(log_number); cfd->mem()->SetNextLogNumber(log_number);
cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free); cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free);
new_mem->Ref(); new_mem->Ref();

@ -1800,8 +1800,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
} }
if (s.ok()) { if (s.ok()) {
SequenceNumber seq = versions_->LastSequence(); SequenceNumber seq = versions_->LastSequence();
new_mem = new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
cfd->ConstructNewMemtable(mutable_cf_options, seq, new_log_number);
context->superversion_context.NewSuperVersion(); context->superversion_context.NewSuperVersion();
} }
ROCKS_LOG_INFO(immutable_db_options_.info_log, ROCKS_LOG_INFO(immutable_db_options_.info_log,

@ -333,17 +333,6 @@ void FlushJob::Cancel() {
base_->Unref(); base_->Unref();
} }
uint64_t FlushJob::ExtractEarliestLogFileNumber() {
uint64_t earliest_logno = 0;
for (MemTable* m : mems_) {
uint64_t logno = m->GetEarliestLogFileNumber();
if (logno > 0 && (earliest_logno == 0 || logno < earliest_logno)) {
earliest_logno = logno;
}
}
return earliest_logno;
}
Status FlushJob::MemPurge() { Status FlushJob::MemPurge() {
Status s; Status s;
db_mutex_->AssertHeld(); db_mutex_->AssertHeld();
@ -387,8 +376,6 @@ Status FlushJob::MemPurge() {
NewMergingIterator(&(cfd_->internal_comparator()), memtables.data(), NewMergingIterator(&(cfd_->internal_comparator()), memtables.data(),
static_cast<int>(memtables.size()), &arena)); static_cast<int>(memtables.size()), &arena));
uint64_t earliest_logno = ExtractEarliestLogFileNumber();
auto* ioptions = cfd_->ioptions(); auto* ioptions = cfd_->ioptions();
// Place iterator at the First (meaning most recent) key node. // Place iterator at the First (meaning most recent) key node.
@ -429,7 +416,7 @@ Status FlushJob::MemPurge() {
new_mem = new MemTable((cfd_->internal_comparator()), *(cfd_->ioptions()), new_mem = new MemTable((cfd_->internal_comparator()), *(cfd_->ioptions()),
mutable_cf_options_, cfd_->write_buffer_mgr(), mutable_cf_options_, cfd_->write_buffer_mgr(),
earliest_seqno, cfd_->GetID(), earliest_logno); earliest_seqno, cfd_->GetID());
assert(new_mem != nullptr); assert(new_mem != nullptr);
Env* env = db_options_.env; Env* env = db_options_.env;

@ -123,7 +123,6 @@ class FlushJob {
// recommend all users not to set this flag as true given that the MemPurge // recommend all users not to set this flag as true given that the MemPurge
// process has not matured yet. // process has not matured yet.
Status MemPurge(); Status MemPurge();
uint64_t ExtractEarliestLogFileNumber();
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
std::unique_ptr<FlushJobInfo> GetFlushJobInfo() const; std::unique_ptr<FlushJobInfo> GetFlushJobInfo() const;
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE

@ -67,8 +67,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
const ImmutableOptions& ioptions, const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options,
WriteBufferManager* write_buffer_manager, WriteBufferManager* write_buffer_manager,
SequenceNumber latest_seq, uint32_t column_family_id, SequenceNumber latest_seq, uint32_t column_family_id)
uint64_t current_logfile_number)
: comparator_(cmp), : comparator_(cmp),
moptions_(ioptions, mutable_cf_options), moptions_(ioptions, mutable_cf_options),
refs_(0), refs_(0),
@ -99,7 +98,6 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
earliest_seqno_(latest_seq), earliest_seqno_(latest_seq),
creation_seq_(latest_seq), creation_seq_(latest_seq),
mem_next_logfile_number_(0), mem_next_logfile_number_(0),
mem_min_logfile_number_(current_logfile_number),
min_prep_log_referenced_(0), min_prep_log_referenced_(0),
locks_(moptions_.inplace_update_support locks_(moptions_.inplace_update_support
? moptions_.inplace_update_num_locks ? moptions_.inplace_update_num_locks

@ -106,8 +106,7 @@ class MemTable {
const ImmutableOptions& ioptions, const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options,
WriteBufferManager* write_buffer_manager, WriteBufferManager* write_buffer_manager,
SequenceNumber earliest_seq, uint32_t column_family_id, SequenceNumber earliest_seq, uint32_t column_family_id);
uint64_t current_logfile_number = 0);
// No copying allowed // No copying allowed
MemTable(const MemTable&) = delete; MemTable(const MemTable&) = delete;
MemTable& operator=(const MemTable&) = delete; MemTable& operator=(const MemTable&) = delete;
@ -388,16 +387,6 @@ class MemTable {
// operations on the same MemTable. // operations on the same MemTable.
void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; } void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; }
// Set the earliest log file number that (possibly)
// contains entries from this memtable.
void SetEarliestLogFileNumber(uint64_t logno) {
mem_min_logfile_number_ = logno;
}
// Return the earliest log file number that (possibly)
// contains entries from this memtable.
uint64_t GetEarliestLogFileNumber() { return mem_min_logfile_number_; }
// if this memtable contains data from a committed // if this memtable contains data from a committed
// two phase transaction we must take note of the // two phase transaction we must take note of the
// log which contains that data so we can know // log which contains that data so we can know
@ -528,10 +517,6 @@ class MemTable {
// The log files earlier than this number can be deleted. // The log files earlier than this number can be deleted.
uint64_t mem_next_logfile_number_; uint64_t mem_next_logfile_number_;
// The earliest log containing entries inserted into
// this memtable.
uint64_t mem_min_logfile_number_;
// the earliest log containing a prepared section // the earliest log containing a prepared section
// which has been inserted into this memtable. // which has been inserted into this memtable.
std::atomic<uint64_t> min_prep_log_referenced_; std::atomic<uint64_t> min_prep_log_referenced_;

@ -521,9 +521,14 @@ Status MemTableList::TryInstallMemtableFlushResults(
// and don't commit anything to the manifest file. // and don't commit anything to the manifest file.
RemoveMemTablesOrRestoreFlags(s, cfd, batch_count, log_buffer, RemoveMemTablesOrRestoreFlags(s, cfd, batch_count, log_buffer,
to_delete, mu); to_delete, mu);
// Note: cfd->SetLogNumber is only called when a VersionEdit
// is written to MANIFEST. When mempurge is succesful, we skip
// this step, therefore cfd->GetLogNumber is always is
// earliest log with data unflushed.
// Notify new head of manifest write queue. // Notify new head of manifest write queue.
// wake up all the waiting writers // wake up all the waiting writers
// TODO(bjlemaire): explain full reason needed or investigate more. // TODO(bjlemaire): explain full reason WakeUpWaitingManifestWriters
// needed or investigate more.
vset->WakeUpWaitingManifestWriters(); vset->WakeUpWaitingManifestWriters();
*io_s = IOStatus::OK(); *io_s = IOStatus::OK();
} }
@ -694,21 +699,6 @@ void MemTableList::RemoveMemTablesOrRestoreFlags(
} }
} }
// Returns the earliest log that possibly contain entries
// from one of the memtables of this memtable_list.
uint64_t MemTableList::EarliestLogContainingData() {
uint64_t min_log = 0;
for (auto& m : current_->memlist_) {
uint64_t log = m->GetEarliestLogFileNumber();
if (log > 0 && (min_log == 0 || log < min_log)) {
min_log = log;
}
}
return min_log;
}
uint64_t MemTableList::PrecomputeMinLogContainingPrepSection( uint64_t MemTableList::PrecomputeMinLogContainingPrepSection(
const std::unordered_set<MemTable*>* memtables_to_flush) { const std::unordered_set<MemTable*>* memtables_to_flush) {
uint64_t min_log = 0; uint64_t min_log = 0;

@ -347,10 +347,6 @@ class MemTableList {
size_t* current_memory_usage() { return &current_memory_usage_; } size_t* current_memory_usage() { return &current_memory_usage_; }
// Returns the earliest log that possibly contain entries
// from one of the memtables of this memtable_list.
uint64_t EarliestLogContainingData();
// Returns the min log containing the prep section after memtables listsed in // Returns the min log containing the prep section after memtables listsed in
// `memtables_to_flush` are flushed and their status is persisted in manifest. // `memtables_to_flush` are flushed and their status is persisted in manifest.
uint64_t PrecomputeMinLogContainingPrepSection( uint64_t PrecomputeMinLogContainingPrepSection(

@ -387,7 +387,7 @@ class Repairer {
// Initialize per-column family memtables // Initialize per-column family memtables
for (auto* cfd : *vset_.GetColumnFamilySet()) { for (auto* cfd : *vset_.GetColumnFamilySet()) {
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
kMaxSequenceNumber, cfd->GetLogNumber()); kMaxSequenceNumber);
} }
auto cf_mems = new ColumnFamilyMemTablesImpl(vset_.GetColumnFamilySet()); auto cf_mems = new ColumnFamilyMemTablesImpl(vset_.GetColumnFamilySet());

@ -548,11 +548,6 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
"records NOT monotonically increasing"); "records NOT monotonically increasing");
} else { } else {
cfd->SetLogNumber(edit.log_number_); cfd->SetLogNumber(edit.log_number_);
if (version_set_->db_options()->experimental_allow_mempurge &&
edit.log_number_ > 0 &&
(cfd->mem()->GetEarliestLogFileNumber() == 0)) {
cfd->mem()->SetEarliestLogFileNumber(edit.log_number_);
}
version_edit_params_.SetLogNumber(edit.log_number_); version_edit_params_.SetLogNumber(edit.log_number_);
} }
} }

@ -5646,7 +5646,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily(
// GetLatestMutableCFOptions() is safe here without mutex since the // GetLatestMutableCFOptions() is safe here without mutex since the
// cfd is not available to client // cfd is not available to client
new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(), new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(),
LastSequence(), edit->log_number_); LastSequence());
new_cfd->SetLogNumber(edit->log_number_); new_cfd->SetLogNumber(edit->log_number_);
return new_cfd; return new_cfd;
} }

@ -1161,15 +1161,6 @@ class VersionSet {
if (min_log_num > num && !cfd->IsDropped()) { if (min_log_num > num && !cfd->IsDropped()) {
min_log_num = num; min_log_num = num;
} }
// If mempurge is activated, there may be an immutable memtable
// that has data not flushed to any SST file.
if (db_options_->experimental_allow_mempurge && !(cfd->IsEmpty()) &&
!(cfd->IsDropped())) {
num = cfd->imm()->EarliestLogContainingData();
if ((num > 0) && (min_log_num > num)) {
min_log_num = num;
}
}
} }
return min_log_num; return min_log_num;
} }
@ -1187,15 +1178,6 @@ class VersionSet {
if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) { if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) {
min_log_num = cfd->GetLogNumber(); min_log_num = cfd->GetLogNumber();
} }
// If mempurge is activated, there may be an immutable memtable
// that has data not flushed to any SST file.
if (db_options_->experimental_allow_mempurge && !(cfd->IsEmpty()) &&
!(cfd->IsDropped())) {
uint64_t num = cfd->imm()->EarliestLogContainingData();
if ((num > 0) && (min_log_num > num)) {
min_log_num = num;
}
}
} }
return min_log_num; return min_log_num;
} }

Loading…
Cancel
Save