Enable flushing memtables from arbitrary column families

Summary: Removed default_cfd_ from all flush code paths. This means we can now flush memtables from arbitrary column families!

Test Plan: Added a new unit test

Reviewers: dhruba, haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15789
main
Igor Canadi 11 years ago
parent 9ca638a86d
commit 3615f534d1
  1. 37
      db/column_family_test.cc
  2. 151
      db/db_impl.cc
  3. 10
      db/db_impl.h
  4. 37
      db/version_set.cc
  5. 9
      db/version_set.h

@ -66,6 +66,9 @@ class ColumnFamilyTest {
Status Merge(int cf, const string& key, const string& value) {
return db_->Merge(WriteOptions(), handles_[cf], Slice(key), Slice(value));
}
Status Flush(int cf) {
return db_->Flush(FlushOptions(), handles_[cf]);
}
string Get(int cf, const string& key) {
ReadOptions options;
@ -238,6 +241,40 @@ TEST(ColumnFamilyTest, IgnoreRecoveredLog) {
}
}
TEST(ColumnFamilyTest, FlushTest) {
ASSERT_OK(Open({"default"}));
CreateColumnFamilies({"one", "two"});
Close();
ASSERT_OK(Open({"default", "one", "two"}));
ASSERT_OK(Put(0, "foo", "v1"));
ASSERT_OK(Put(0, "bar", "v2"));
ASSERT_OK(Put(1, "mirko", "v3"));
ASSERT_OK(Put(0, "foo", "v2"));
ASSERT_OK(Put(2, "fodor", "v5"));
for (int i = 0; i < 3; ++i) {
Flush(i);
}
Close();
ASSERT_OK(Open({"default", "one", "two"}));
for (int iter = 0; iter <= 2; ++iter) {
ASSERT_EQ("v2", Get(0, "foo"));
ASSERT_EQ("v2", Get(0, "bar"));
ASSERT_EQ("v3", Get(1, "mirko"));
ASSERT_EQ("v5", Get(2, "fodor"));
ASSERT_EQ("NOT_FOUND", Get(0, "fodor"));
ASSERT_EQ("NOT_FOUND", Get(1, "fodor"));
ASSERT_EQ("NOT_FOUND", Get(2, "foo"));
if (iter <= 1) {
// reopen
Close();
ASSERT_OK(Open({"default", "one", "two"}));
}
}
Close();
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -317,8 +317,12 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
DBImpl::~DBImpl() {
// Wait for background work to finish
if (flush_on_destroy_ && default_cfd_->mem()->GetFirstSequenceNumber() != 0) {
FlushMemTable(FlushOptions());
if (flush_on_destroy_) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->mem()->GetFirstSequenceNumber() != 0) {
FlushMemTable(cfd, FlushOptions());
}
}
}
mutex_.Lock();
shutting_down_.Release_Store(this); // Any non-nullptr value is ok
@ -979,6 +983,9 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->mem()->ApproximateMemoryUsage() >
cfd->options()->write_buffer_size) {
// If this asserts, it means that ColumnFamilyMemTablesImpl failed in
// filtering updates to already-flushed column families
assert(cfd->GetLogNumber() <= log_number);
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
VersionEdit* edit = &iter->second;
@ -1001,8 +1008,20 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
assert(iter != version_edits.end());
VersionEdit* edit = &iter->second;
// flush the final memtable
status = WriteLevel0TableForRecovery(cfd->mem(), edit);
if (cfd->GetLogNumber() > log_number) {
// Column family cfd has already flushed the data
// from log_number. Memtable has to be empty because
// we filter the updates based on log_number
// (in ColumnFamilyMemTablesImpl)
assert(cfd->mem()->GetFirstSequenceNumber() == 0);
assert(edit->NumEntries() == 0);
continue;
}
// flush the final memtable (if non-empty)
if (cfd->mem()->GetFirstSequenceNumber() != 0) {
status = WriteLevel0TableForRecovery(cfd->mem(), edit);
}
// we still want to clear the memtable, even if the recovery failed
cfd->CreateNewMemtable();
if (!status.ok()) {
@ -1016,6 +1035,12 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
// Since we already recovered log_number, we want all logs
// with numbers `<= log_number` (includes this one) to be ignored
edit->SetLogNumber(log_number + 1);
// we must mark the next log number as used, even though it's
// not actually used. that is because VersionSet assumes
// VersionSet::next_file_number_ always to be strictly greater than any
// log
// number
versions_->MarkFileNumberUsed(log_number + 1);
status = versions_->LogAndApply(cfd, edit, &mutex_);
if (!status.ok()) {
return status;
@ -1077,8 +1102,8 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) {
return s;
}
Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
std::vector<MemTable*>& mems, VersionEdit* edit,
uint64_t* filenumber) {
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
@ -1090,7 +1115,7 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
const SequenceNumber newest_snapshot = snapshots_.GetNewest();
const SequenceNumber earliest_seqno_in_memtable =
mems[0]->GetFirstSequenceNumber();
Version* base = default_cfd_->current();
Version* base = cfd->current();
base->Ref(); // it is likely that we do not need this reference
Status s;
{
@ -1127,7 +1152,7 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
// re-acquire the most current version
base = default_cfd_->current();
base = cfd->current();
// There could be multiple threads writing to its own level-0 file.
// The pending_outputs cannot be cleared here, otherwise this newly
@ -1149,7 +1174,7 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
// threads could be concurrently producing compacted files for
// that key range.
if (base != nullptr && options_.max_background_compactions <= 1 &&
options_.compaction_style == kCompactionStyleLevel) {
cfd->options()->compaction_style == kCompactionStyleLevel) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
edit->AddFile(level, meta.number, meta.file_size,
@ -1165,12 +1190,13 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
return s;
}
Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
bool* madeProgress,
DeletionState& deletion_state) {
mutex_.AssertHeld();
assert(default_cfd_->imm()->size() != 0);
assert(cfd->imm()->size() != 0);
if (!default_cfd_->imm()->IsFlushPending()) {
if (!cfd->imm()->IsFlushPending()) {
Log(options_.info_log, "FlushMemTableToOutputFile already in progress");
return Status::IOError("FlushMemTableToOutputFile already in progress");
}
@ -1178,7 +1204,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
// Save the contents of the earliest memtable as a new Table
uint64_t file_number;
std::vector<MemTable*> mems;
default_cfd_->imm()->PickMemtablesToFlush(&mems);
cfd->imm()->PickMemtablesToFlush(&mems);
if (mems.empty()) {
Log(options_.info_log, "Nothing in memstore to flush");
return Status::IOError("Nothing in memstore to flush");
@ -1193,9 +1219,8 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
edit->SetPrevLogNumber(0);
// SetLogNumber(log_num) indicates logs with number smaller than log_num
// will no longer be picked up for recovery.
edit->SetLogNumber(
mems.back()->GetNextLogNumber()
);
edit->SetLogNumber(mems.back()->GetNextLogNumber());
edit->SetColumnFamily(cfd->GetID());
std::vector<uint64_t> logs_to_delete;
for (auto mem : mems) {
@ -1203,7 +1228,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
}
// This will release and re-acquire the mutex.
Status s = WriteLevel0Table(mems, edit, &file_number);
Status s = WriteLevel0Table(cfd, mems, edit, &file_number);
if (s.ok() && shutting_down_.Acquire_Load()) {
s = Status::IOError(
@ -1212,13 +1237,13 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
}
// Replace immutable memtable with the generated Table
s = default_cfd_->imm()->InstallMemtableFlushResults(
default_cfd_, mems, versions_.get(), s, &mutex_, options_.info_log.get(),
s = cfd->imm()->InstallMemtableFlushResults(
cfd, mems, versions_.get(), s, &mutex_, options_.info_log.get(),
file_number, pending_outputs_, &deletion_state.memtables_to_free,
db_directory_.get());
if (s.ok()) {
InstallSuperVersion(default_cfd_, deletion_state);
InstallSuperVersion(cfd, deletion_state);
if (madeProgress) {
*madeProgress = 1;
}
@ -1239,7 +1264,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family,
const Slice* begin, const Slice* end,
bool reduce_level, int target_level) {
Status s = FlushMemTable(FlushOptions());
Status s = FlushMemTable(default_cfd_, FlushOptions());
if (!s.ok()) {
LogFlush(options_.info_log);
return s;
@ -1382,8 +1407,12 @@ uint64_t DBImpl::CurrentVersionNumber() const {
Status DBImpl::Flush(const FlushOptions& options,
const ColumnFamilyHandle& column_family) {
Status status = FlushMemTable(options);
return status;
mutex_.Lock();
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
mutex_.Unlock();
assert(cfd != nullptr);
return FlushMemTable(cfd, options);
}
SequenceNumber DBImpl::GetLatestSequenceNumber() const {
@ -1657,35 +1686,36 @@ Status DBImpl::TEST_CompactRange(int level,
return RunManualCompaction(level, output_level, begin, end);
}
Status DBImpl::FlushMemTable(const FlushOptions& options) {
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& options) {
// nullptr batch means just wait for earlier writes to be done
Status s = Write(WriteOptions(), nullptr);
if (s.ok() && options.wait) {
// Wait until the compaction completes
s = WaitForFlushMemTable();
s = WaitForFlushMemTable(cfd);
}
return s;
}
Status DBImpl::WaitForFlushMemTable() {
Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
Status s;
// Wait until the compaction completes
MutexLock l(&mutex_);
while (default_cfd_->imm()->size() > 0 && bg_error_.ok()) {
while (cfd->imm()->size() > 0 && bg_error_.ok()) {
bg_cv_.Wait();
}
if (default_cfd_->imm()->size() != 0) {
if (!bg_error_.ok()) {
s = bg_error_;
}
return s;
}
Status DBImpl::TEST_FlushMemTable() {
return FlushMemTable(FlushOptions());
return FlushMemTable(default_cfd_, FlushOptions());
}
Status DBImpl::TEST_WaitForFlushMemTable() {
return WaitForFlushMemTable();
return WaitForFlushMemTable(default_cfd_);
}
Status DBImpl::TEST_WaitForCompact() {
@ -1710,19 +1740,31 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
} else {
bool is_flush_pending = default_cfd_->imm()->IsFlushPending();
bool is_flush_pending = false;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->imm()->IsFlushPending()) {
is_flush_pending = true;
}
}
if (is_flush_pending &&
(bg_flush_scheduled_ < options_.max_background_flushes)) {
// memtable flush needed
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH);
}
bool is_compaction_needed = false;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->current()->NeedsCompaction()) {
is_compaction_needed = true;
break;
}
}
// Schedule BGWorkCompaction if there's a compaction pending (or a memtable
// flush, but the HIGH pool is not enabled). Do it only if
// max_background_compactions hasn't been reached and, in case
// bg_manual_only_ > 0, if it's a manual compaction.
if ((manual_compaction_ || default_cfd_->current()->NeedsCompaction() ||
if ((manual_compaction_ || is_compaction_needed ||
(is_flush_pending && (options_.max_background_flushes <= 0))) &&
bg_compaction_scheduled_ < options_.max_background_compactions &&
(!bg_manual_only_ || manual_compaction_)) {
@ -1744,11 +1786,14 @@ void DBImpl::BGWorkCompaction(void* db) {
Status DBImpl::BackgroundFlush(bool* madeProgress,
DeletionState& deletion_state) {
Status stat;
while (stat.ok() && default_cfd_->imm()->IsFlushPending()) {
Log(options_.info_log,
"BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d",
options_.max_background_flushes - bg_flush_scheduled_);
stat = FlushMemTableToOutputFile(madeProgress, deletion_state);
for (auto cfd : *versions_->GetColumnFamilySet()) {
while (stat.ok() && cfd->imm()->IsFlushPending()) {
Log(options_.info_log,
"BackgroundCallFlush doing FlushMemTableToOutputFile with column "
"family %u, flush slots available %d",
cfd->GetID(), options_.max_background_flushes - bg_flush_scheduled_);
stat = FlushMemTableToOutputFile(cfd, madeProgress, deletion_state);
}
}
return stat;
}
@ -1871,20 +1916,24 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
}
// TODO: remove memtable flush from formal compaction
while (default_cfd_->imm()->IsFlushPending()) {
Log(options_.info_log,
"BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots "
"available %d",
options_.max_background_compactions - bg_compaction_scheduled_);
Status stat = FlushMemTableToOutputFile(madeProgress, deletion_state);
if (!stat.ok()) {
if (is_manual) {
manual_compaction_->status = stat;
manual_compaction_->done = true;
manual_compaction_->in_progress = false;
manual_compaction_ = nullptr;
for (auto cfd : *versions_->GetColumnFamilySet()) {
while (cfd->imm()->IsFlushPending()) {
Log(options_.info_log,
"BackgroundCompaction doing FlushMemTableToOutputFile with column "
"family %d, compaction slots available %d",
cfd->GetID(),
options_.max_background_compactions - bg_compaction_scheduled_);
Status stat =
FlushMemTableToOutputFile(cfd, madeProgress, deletion_state);
if (!stat.ok()) {
if (is_manual) {
manual_compaction_->status = stat;
manual_compaction_->done = true;
manual_compaction_->in_progress = false;
manual_compaction_ = nullptr;
}
return stat;
}
return stat;
}
}
@ -2285,7 +2334,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
LogFlush(options_.info_log);
mutex_.Lock();
if (default_cfd_->imm()->IsFlushPending()) {
FlushMemTableToOutputFile(nullptr, deletion_state);
FlushMemTableToOutputFile(default_cfd_, nullptr, deletion_state);
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
mutex_.Unlock();

@ -286,7 +286,7 @@ class DBImpl : public DB {
// Flush the in-memory write buffer to storage. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
Status FlushMemTableToOutputFile(bool* madeProgress,
Status FlushMemTableToOutputFile(ColumnFamilyData* cfd, bool* madeProgress,
DeletionState& deletion_state);
Status RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
@ -298,8 +298,8 @@ class DBImpl : public DB {
// for the entire period. The second method WriteLevel0Table supports
// concurrent flush memtables to storage.
Status WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit);
Status WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
uint64_t* filenumber);
Status WriteLevel0Table(ColumnFamilyData* cfd, std::vector<MemTable*>& mems,
VersionEdit* edit, uint64_t* filenumber);
uint64_t SlowdownAmount(int n, double bottom, double top);
Status MakeRoomForWrite(ColumnFamilyData* cfd,
@ -308,10 +308,10 @@ class DBImpl : public DB {
autovector<WriteBatch*>* write_batch_group);
// Force current memtable contents to be flushed.
Status FlushMemTable(const FlushOptions& options);
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options);
// Wait for memtable flushed
Status WaitForFlushMemTable();
Status WaitForFlushMemTable(ColumnFamilyData* cfd);
void MaybeScheduleLogDBDeployStats();
static void BGLogDBDeployStats(void* db);

@ -1130,10 +1130,12 @@ struct VersionSet::ManifestWriter {
Status status;
bool done;
port::CondVar cv;
ColumnFamilyData* cfd;
VersionEdit* edit;
explicit ManifestWriter(port::Mutex* mu, VersionEdit* e) :
done(false), cv(mu), edit(e) {}
explicit ManifestWriter(port::Mutex* mu, ColumnFamilyData* cfd,
VersionEdit* e)
: done(false), cv(mu), cfd(cfd), edit(e) {}
};
// A helper class so we can efficiently apply a whole sequence
@ -1374,7 +1376,6 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options,
next_file_number_(2),
manifest_file_number_(0), // Filled by Recover()
last_sequence_(0),
log_number_(0),
prev_log_number_(0),
num_levels_(options_->num_levels),
current_version_number_(0),
@ -1428,7 +1429,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
mu->AssertHeld();
// queue our request
ManifestWriter w(mu, edit);
ManifestWriter w(mu, column_family_data, edit);
manifest_writers_.push_back(&w);
while (!w.done && &w != manifest_writers_.front()) {
w.cv.Wait();
@ -1447,8 +1448,12 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
assert(manifest_writers_.front() == &w);
std::deque<ManifestWriter*>::iterator iter = manifest_writers_.begin();
for (; iter != manifest_writers_.end(); ++iter) {
if ((*iter)->cfd->GetID() != column_family_data->GetID()) {
// group commits across column families are not yet supported
break;
}
last_writer = *iter;
LogAndApplyHelper(&builder, v, last_writer->edit, mu);
LogAndApplyHelper(column_family_data, &builder, v, last_writer->edit, mu);
batch_edits.push_back(last_writer->edit);
}
builder.SaveTo(v);
@ -1564,7 +1569,6 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
if (s.ok()) {
manifest_file_size_ = new_manifest_file_size;
AppendVersion(column_family_data, v);
log_number_ = edit->log_number_;
column_family_data->SetLogNumber(edit->log_number_);
prev_log_number_ = edit->prev_log_number_;
@ -1596,15 +1600,16 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
return s;
}
void VersionSet::LogAndApplyHelper(Builder* builder, Version* v,
VersionEdit* edit, port::Mutex* mu) {
void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, Builder* builder,
Version* v, VersionEdit* edit,
port::Mutex* mu) {
mu->AssertHeld();
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ >= cfd->GetLogNumber());
assert(edit->log_number_ < next_file_number_);
} else {
edit->SetLogNumber(log_number_);
edit->SetLogNumber(cfd->GetLogNumber());
}
if (!edit->has_prev_log_number_) {
@ -1754,6 +1759,7 @@ Status VersionSet::Recover(
if (edit.has_log_number_) {
cfd->SetLogNumber(edit.log_number_);
have_log_number = true;
}
// if it is not column family add or column family drop,
@ -1764,11 +1770,6 @@ Status VersionSet::Recover(
builder->second->Apply(&edit);
}
if (edit.has_log_number_) {
log_number = edit.log_number_;
have_log_number = true;
}
if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_;
have_prev_log_number = true;
@ -1828,7 +1829,6 @@ Status VersionSet::Recover(
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;
Log(options_->info_log, "Recovered from manifest file:%s succeeded,"
@ -1839,7 +1839,7 @@ Status VersionSet::Recover(
(unsigned long)manifest_file_number_,
(unsigned long)next_file_number_,
(unsigned long)last_sequence_,
(unsigned long)log_number_,
(unsigned long)log_number,
(unsigned long)prev_log_number_);
for (auto cfd : *column_family_set_) {
@ -2041,7 +2041,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
}
if (edit.has_log_number_) {
log_number = edit.log_number_;
log_number = std::max(log_number, edit.log_number_);
have_log_number = true;
}
@ -2090,7 +2090,6 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;
printf("manifest_file_number %lu next_file_number %lu last_sequence "

@ -344,10 +344,6 @@ class VersionSet {
// Mark the specified file number as used.
void MarkFileNumberUsed(uint64_t number);
// Return the current log file number. This is the biggest log_number from
// all column families
uint64_t LogNumber() const { return log_number_; }
// Return the log file number for the log file that is currently
// being compacted, or zero if there is no such log file.
uint64_t PrevLogNumber() const { return prev_log_number_; }
@ -468,7 +464,6 @@ class VersionSet {
uint64_t next_file_number_;
uint64_t manifest_file_number_;
std::atomic<uint64_t> last_sequence_;
uint64_t log_number_;
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted
int num_levels_;
@ -502,8 +497,8 @@ class VersionSet {
VersionSet(const VersionSet&);
void operator=(const VersionSet&);
void LogAndApplyHelper(Builder*b, Version* v,
VersionEdit* edit, port::Mutex* mu);
void LogAndApplyHelper(ColumnFamilyData* cfd, Builder* b, Version* v,
VersionEdit* edit, port::Mutex* mu);
};
} // namespace rocksdb

Loading…
Cancel
Save