LogAndApply to take ColumnFamilyData

Summary: This removes the default implementation of LogAndApply that applied the changed to the default column family by default. It is mostly simple reformatting.

Test Plan: make check

Reviewers: dhruba, kailiu

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15465
main
Igor Canadi 11 years ago
parent eb055609e4
commit 511b03a5b5
  1. 26
      db/db_impl.cc
  2. 5
      db/db_test.cc
  3. 6
      db/memtablelist.cc
  4. 14
      db/memtablelist.h
  5. 3
      db/version_set.cc
  6. 7
      db/version_set.h

@ -1009,7 +1009,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
// Since we already recovered log_number, we want all logs
// with numbers `<= log_number` (includes this one) to be ignored
edit.SetLogNumber(log_number + 1);
status = versions_->LogAndApply(&edit, &mutex_);
status = versions_->LogAndApply(default_cfd_, &edit, &mutex_);
}
return status;
@ -1204,8 +1204,9 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
// Replace immutable memtable with the generated Table
s = default_cfd_->imm.InstallMemtableFlushResults(
mems, versions_.get(), s, &mutex_, options_.info_log.get(), file_number,
pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get());
default_cfd_, mems, versions_.get(), s, &mutex_, options_.info_log.get(),
file_number, pending_outputs_, &deletion_state.memtables_to_free,
db_directory_.get());
if (s.ok()) {
InstallSuperVersion(default_cfd_, deletion_state);
@ -1333,7 +1334,8 @@ Status DBImpl::ReFitLevel(int level, int target_level) {
Log(options_.info_log, "Apply version edit:\n%s",
edit.DebugString().data());
status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get());
status = versions_->LogAndApply(default_cfd_, &edit, &mutex_,
db_directory_.get());
superversion_to_free = InstallSuperVersion(default_cfd_, new_superversion);
new_superversion = nullptr;
@ -1906,7 +1908,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
status = versions_->LogAndApply(c->edit(), &mutex_, db_directory_.get());
status = versions_->LogAndApply(default_cfd_, c->edit(), &mutex_,
db_directory_.get());
InstallSuperVersion(default_cfd_, deletion_state);
Version::LevelSummaryStorage tmp;
@ -2155,8 +2158,8 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
compact->compaction->output_level(), out.number, out.file_size,
out.smallest, out.largest, out.smallest_seqno, out.largest_seqno);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_,
db_directory_.get());
return versions_->LogAndApply(default_cfd_, compact->compaction->edit(),
&mutex_, db_directory_.get());
}
//
@ -2949,7 +2952,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
edit.AddColumnFamily(column_family_name);
handle->id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
edit.SetColumnFamily(handle->id);
Status s = versions_->LogAndApply(&edit, &mutex_);
Status s = versions_->LogAndApply(default_cfd_, &edit, &mutex_);
if (s.ok()) {
// add to internal data structures
versions_->CreateColumnFamily(options, &edit);
@ -2968,7 +2971,7 @@ Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) {
VersionEdit edit;
edit.DropColumnFamily();
edit.SetColumnFamily(column_family.id);
Status s = versions_->LogAndApply(&edit, &mutex_);
Status s = versions_->LogAndApply(default_cfd_, &edit, &mutex_);
if (s.ok()) {
// remove from internal data structures
versions_->DropColumnFamily(&edit);
@ -3830,7 +3833,8 @@ Status DBImpl::DeleteFile(std::string name) {
}
}
edit.DeleteFile(level, number);
status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get());
status = versions_->LogAndApply(default_cfd_, &edit, &mutex_,
db_directory_.get());
if (status.ok()) {
InstallSuperVersion(default_cfd_, deletion_state);
}
@ -3977,7 +3981,7 @@ Status DB::OpenWithColumnFamilies(
edit.SetLogNumber(new_log_number);
impl->logfile_number_ = new_log_number;
impl->log_.reset(new log::Writer(std::move(lfile)));
s = impl->versions_->LogAndApply(&edit, &impl->mutex_,
s = impl->versions_->LogAndApply(impl->default_cfd_, &edit, &impl->mutex_,
impl->db_directory_.get());
}
if (s.ok()) {

@ -5046,6 +5046,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
std::vector<ColumnFamilyDescriptor> dummy;
dummy.push_back(ColumnFamilyDescriptor());
ASSERT_OK(vset.Recover(dummy));
auto default_cfd = vset.GetColumnFamilySet()->GetDefault();
VersionEdit vbase;
uint64_t fnum = 1;
for (int i = 0; i < num_base_files; i++) {
@ -5053,7 +5054,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
vbase.AddFile(2, fnum++, 1 /* file size */, start, limit, 1, 1);
}
ASSERT_OK(vset.LogAndApply(&vbase, &mu));
ASSERT_OK(vset.LogAndApply(default_cfd, &vbase, &mu));
uint64_t start_micros = env->NowMicros();
@ -5063,7 +5064,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
vedit.AddFile(2, fnum++, 1 /* file size */, start, limit, 1, 1);
vset.LogAndApply(&vedit, &mu);
vset.LogAndApply(default_cfd, &vedit, &mu);
}
uint64_t stop_micros = env->NowMicros();
unsigned int us = stop_micros - start_micros;

@ -122,8 +122,8 @@ void MemTableList::PickMemtablesToFlush(std::vector<MemTable*>* ret) {
// Record a successful flush in the manifest file
Status MemTableList::InstallMemtableFlushResults(
const std::vector<MemTable*>& mems, VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log, uint64_t file_number,
ColumnFamilyData* cfd, const std::vector<MemTable*>& mems, VersionSet* vset,
Status flushStatus, port::Mutex* mu, Logger* info_log, uint64_t file_number,
std::set<uint64_t>& pending_outputs, std::vector<MemTable*>* to_delete,
Directory* db_directory) {
mu->AssertHeld();
@ -177,7 +177,7 @@ Status MemTableList::InstallMemtableFlushResults(
(unsigned long)m->file_number_);
// this can release and reacquire the mutex.
s = vset->LogAndApply(&m->edit_, mu, db_directory);
s = vset->LogAndApply(cfd, &m->edit_, mu, db_directory);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable

@ -8,6 +8,7 @@
#include <string>
#include <list>
#include <vector>
#include <set>
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/iterator.h"
@ -17,6 +18,7 @@
namespace rocksdb {
class ColumnFamilyData;
class InternalKeyComparator;
class Mutex;
@ -90,13 +92,11 @@ class MemTableList {
void PickMemtablesToFlush(std::vector<MemTable*>* mems);
// Commit a successful flush in the manifest file
Status InstallMemtableFlushResults(const std::vector<MemTable*>& m,
VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log,
uint64_t file_number,
std::set<uint64_t>& pending_outputs,
std::vector<MemTable*>* to_delete,
Directory* db_directory);
Status InstallMemtableFlushResults(
ColumnFamilyData* cfd, const std::vector<MemTable*>& m, VersionSet* vset,
Status flushStatus, port::Mutex* mu, Logger* info_log,
uint64_t file_number, std::set<uint64_t>& pending_outputs,
std::vector<MemTable*>* to_delete, Directory* db_directory);
// New memtables are inserted at the front of the list.
// Takes ownership of the referenced held on *m by the caller of Add().

@ -1973,7 +1973,8 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
VersionEdit ve;
port::Mutex dummy_mutex;
MutexLock l(&dummy_mutex);
return versions.LogAndApply(&ve, &dummy_mutex, nullptr, true);
return versions.LogAndApply(versions.GetColumnFamilySet()->GetDefault(), &ve,
&dummy_mutex, nullptr, true);
}
Status VersionSet::DumpManifest(Options& options, std::string& dscname,

@ -292,13 +292,6 @@ class VersionSet {
port::Mutex* mu, Directory* db_directory = nullptr,
bool new_descriptor_log = false);
Status LogAndApply(VersionEdit* edit, port::Mutex* mu,
Directory* db_directory = nullptr,
bool new_descriptor_log = false) {
return LogAndApply(column_family_set_->GetDefault(), edit, mu, db_directory,
new_descriptor_log);
}
// Recover the last saved descriptor from persistent storage.
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families);

Loading…
Cancel
Save