Change ColumnFamilyData from struct to class

Summary: ColumnFamilyData grew a lot, there's much more data that it holds now. It makes more sense to encapsulate it better by making it a class.

Test Plan: make check

Reviewers: dhruba, haobo, kailiu, sdong

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15579
main
Igor Canadi 11 years ago
parent 15999e728e
commit fa99d53e55
  1. 68
      db/column_family.cc
  2. 67
      db/column_family.h
  3. 2
      db/db_filesnapshot.cc
  4. 154
      db/db_impl.cc
  5. 15
      db/db_impl.h
  6. 4
      db/db_impl_readonly.cc
  7. 2
      db/db_stats_logger.cc
  8. 66
      db/version_set.cc
  9. 6
      db/version_set.h
  10. 2
      util/ldb_cmd.cc

@ -62,45 +62,59 @@ void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions,
const ColumnFamilyOptions& options)
: id(id),
name(name),
dummy_versions(dummy_versions),
current(nullptr),
options(options),
mem(nullptr),
imm(options.min_write_buffer_number_to_merge),
super_version(nullptr),
log_number(0) {}
: id_(id),
name_(name),
dummy_versions_(dummy_versions),
current_(nullptr),
options_(options),
mem_(nullptr),
imm_(options.min_write_buffer_number_to_merge),
super_version_(nullptr),
super_version_number_(0),
log_number_(0) {}
ColumnFamilyData::~ColumnFamilyData() {
if (super_version != nullptr) {
if (super_version_ != nullptr) {
bool is_last_reference __attribute__((unused));
is_last_reference = super_version->Unref();
is_last_reference = super_version_->Unref();
assert(is_last_reference);
super_version->Cleanup();
delete super_version;
super_version_->Cleanup();
delete super_version_;
}
// List must be empty
assert(dummy_versions->next_ == dummy_versions);
delete dummy_versions;
assert(dummy_versions_->next_ == dummy_versions_);
delete dummy_versions_;
if (mem != nullptr) {
delete mem->Unref();
if (mem_ != nullptr) {
delete mem_->Unref();
}
std::vector<MemTable*> to_delete;
imm.current()->Unref(&to_delete);
imm_.current()->Unref(&to_delete);
for (MemTable* m : to_delete) {
delete m;
}
}
void ColumnFamilyData::CreateNewMemtable() {
assert(current != nullptr);
if (mem != nullptr) {
delete mem->Unref();
assert(current_ != nullptr);
if (mem_ != nullptr) {
delete mem_->Unref();
}
mem = new MemTable(current->vset_->icmp_, options);
mem->Ref();
mem_ = new MemTable(current_->vset_->icmp_, options_);
mem_->Ref();
}
SuperVersion* ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion) {
new_superversion->Init(mem_, imm_.current(), current_);
SuperVersion* old_superversion = super_version_;
super_version_ = new_superversion;
++super_version_number_;
if (old_superversion != nullptr && old_superversion->Unref()) {
old_superversion->Cleanup();
return old_superversion; // will let caller delete outside of mutex
}
return nullptr;
}
ColumnFamilySet::ColumnFamilySet() : max_column_family_(0) {}
@ -162,8 +176,8 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
void ColumnFamilySet::DropColumnFamily(uint32_t id) {
auto cfd = column_family_data_.find(id);
assert(cfd != column_family_data_.end());
column_families_.erase(cfd->second->name);
cfd->second->current->Unref();
column_families_.erase(cfd->second->GetName());
cfd->second->current()->Unref();
droppped_column_families_.push_back(cfd->second);
column_family_data_.erase(cfd);
}
@ -175,8 +189,8 @@ MemTable* ColumnFamilyMemTablesImpl::GetMemTable(uint32_t column_family_id) {
// API change in WriteBatch::Handler, which is a public API
assert(cfd != nullptr);
if (log_number_ == 0 || log_number_ >= cfd->log_number) {
return cfd->mem;
if (log_number_ == 0 || log_number_ >= cfd->GetLogNumber()) {
return cfd->mem();
} else {
return nullptr;
}

@ -52,30 +52,63 @@ struct SuperVersion {
Version* new_current);
};
// column family metadata
struct ColumnFamilyData {
uint32_t id;
std::string name;
Version* dummy_versions; // Head of circular doubly-linked list of versions.
Version* current; // == dummy_versions->prev_
ColumnFamilyOptions options;
MemTable* mem;
MemTableList imm;
SuperVersion* super_version;
// This is the earliest log file number that contains data from this
// Column Family. All earlier log files must be ignored and not
// recovered from
uint64_t log_number;
// column family metadata. not thread-safe. should be protected by db_mutex
class ColumnFamilyData {
public:
ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions, const ColumnFamilyOptions& options);
~ColumnFamilyData();
uint32_t GetID() const { return id_; }
const std::string& GetName() { return name_; }
void SetLogNumber(uint64_t log_number) { log_number_ = log_number; }
uint64_t GetLogNumber() const { return log_number_; }
ColumnFamilyOptions* options() { return &options_; }
MemTableList* imm() { return &imm_; }
MemTable* mem() { return mem_; }
Version* current() { return current_; }
Version* dummy_versions() { return dummy_versions_; }
void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
void SetCurrent(Version* current) { current_ = current; }
void CreateNewMemtable();
SuperVersion* GetSuperVersion() const { return super_version_; }
uint64_t GetSuperVersionNumber() const {
return super_version_number_.load();
}
// will return a pointer to SuperVersion* if previous SuperVersion
// if its reference count is zero and needs deletion or nullptr if not
// As argument takes a pointer to allocated SuperVersion to enable
// the clients to allocate SuperVersion outside of mutex.
SuperVersion* InstallSuperVersion(SuperVersion* new_superversion);
private:
uint32_t id_;
const std::string name_;
Version* dummy_versions_; // Head of circular doubly-linked list of versions.
Version* current_; // == dummy_versions->prev_
ColumnFamilyOptions options_;
MemTable* mem_;
MemTableList imm_;
SuperVersion* super_version_;
// An ordinal representing the current SuperVersion. Updated by
// InstallSuperVersion(), i.e. incremented every time super_version_
// changes.
std::atomic<uint64_t> super_version_number_;
// This is the earliest log file number that contains data from this
// Column Family. All earlier log files must be ignored and not
// recovered from
uint64_t log_number_;
};
// Thread safe only for reading without a writer. All access should be
// locked when adding or dropping column family
class ColumnFamilySet {
public:
class iterator {

@ -74,7 +74,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
// Make a set of all of the live *.sst files
std::set<uint64_t> live;
default_cfd_->current->AddLiveFiles(&live);
default_cfd_->current()->AddLiveFiles(&live);
ret.clear();
ret.reserve(live.size() + 2); //*.sst + CURRENT + MANIFEST

@ -267,7 +267,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
shutting_down_(nullptr),
bg_cv_(&mutex_),
logfile_number_(0),
super_version_number_(0),
tmp_batch_(),
bg_compaction_scheduled_(0),
bg_manual_only_(0),
@ -331,7 +330,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
DBImpl::~DBImpl() {
// Wait for background work to finish
if (flush_on_destroy_ && default_cfd_->mem->GetFirstSequenceNumber() != 0) {
if (flush_on_destroy_ && default_cfd_->mem()->GetFirstSequenceNumber() != 0) {
FlushMemTable(FlushOptions());
}
mutex_.Lock();
@ -930,8 +929,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
std::unordered_map<int, VersionEdit> version_edits;
for (auto cfd : *versions_->GetColumnFamilySet()) {
VersionEdit edit;
edit.SetColumnFamily(cfd->id);
version_edits.insert({cfd->id, edit});
edit.SetColumnFamily(cfd->GetID());
version_edits.insert({cfd->GetID(), edit});
}
// Open the log file
@ -991,12 +990,12 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
if (!read_only) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->mem->ApproximateMemoryUsage() >
cfd->options.write_buffer_size) {
auto iter = version_edits.find(cfd->id);
if (cfd->mem()->ApproximateMemoryUsage() >
cfd->options()->write_buffer_size) {
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
VersionEdit* edit = &iter->second;
status = WriteLevel0TableForRecovery(cfd->mem, edit);
status = WriteLevel0TableForRecovery(cfd->mem(), edit);
// we still want to clear the memtable, even if the recovery failed
cfd->CreateNewMemtable();
if (!status.ok()) {
@ -1011,12 +1010,12 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
if (!read_only) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
auto iter = version_edits.find(cfd->id);
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
VersionEdit* edit = &iter->second;
// flush the final memtable
status = WriteLevel0TableForRecovery(cfd->mem, edit);
status = WriteLevel0TableForRecovery(cfd->mem(), edit);
// we still want to clear the memtable, even if the recovery failed
cfd->CreateNewMemtable();
if (!status.ok()) {
@ -1104,7 +1103,7 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
const SequenceNumber newest_snapshot = snapshots_.GetNewest();
const SequenceNumber earliest_seqno_in_memtable =
mems[0]->GetFirstSequenceNumber();
Version* base = default_cfd_->current;
Version* base = default_cfd_->current();
base->Ref(); // it is likely that we do not need this reference
Status s;
{
@ -1141,7 +1140,7 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
// re-acquire the most current version
base = default_cfd_->current;
base = default_cfd_->current();
// There could be multiple threads writing to its own level-0 file.
// The pending_outputs cannot be cleared here, otherwise this newly
@ -1182,9 +1181,9 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
DeletionState& deletion_state) {
mutex_.AssertHeld();
assert(default_cfd_->imm.size() != 0);
assert(default_cfd_->imm()->size() != 0);
if (!default_cfd_->imm.IsFlushPending()) {
if (!default_cfd_->imm()->IsFlushPending()) {
Log(options_.info_log, "FlushMemTableToOutputFile already in progress");
Status s = Status::IOError("FlushMemTableToOutputFile already in progress");
return s;
@ -1193,7 +1192,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
// Save the contents of the earliest memtable as a new Table
uint64_t file_number;
std::vector<MemTable*> mems;
default_cfd_->imm.PickMemtablesToFlush(&mems);
default_cfd_->imm()->PickMemtablesToFlush(&mems);
if (mems.empty()) {
Log(options_.info_log, "Nothing in memstore to flush");
Status s = Status::IOError("Nothing in memstore to flush");
@ -1228,7 +1227,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
}
// Replace immutable memtable with the generated Table
s = default_cfd_->imm.InstallMemtableFlushResults(
s = default_cfd_->imm()->InstallMemtableFlushResults(
default_cfd_, mems, versions_.get(), s, &mutex_, options_.info_log.get(),
file_number, pending_outputs_, &deletion_state.memtables_to_free,
db_directory_.get());
@ -1264,7 +1263,7 @@ Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family,
int max_level_with_files = 1;
{
MutexLock l(&mutex_);
Version* base = default_cfd_->current;
Version* base = default_cfd_->current();
for (int level = 1; level < NumberLevels(); level++) {
if (base->OverlapInLevel(level, begin, end)) {
max_level_with_files = level;
@ -1297,7 +1296,7 @@ Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family,
// return the same level if it cannot be moved
int DBImpl::FindMinimumEmptyLevelFitting(int level) {
mutex_.AssertHeld();
Version* current = default_cfd_->current;
Version* current = default_cfd_->current();
int minimum_level = level;
for (int i = level - 1; i > 0; --i) {
// stop if level i is not empty
@ -1348,10 +1347,10 @@ Status DBImpl::ReFitLevel(int level, int target_level) {
Status status;
if (to_level < level) {
Log(options_.info_log, "Before refitting:\n%s",
default_cfd_->current->DebugString().data());
default_cfd_->current()->DebugString().data());
VersionEdit edit;
for (const auto& f : default_cfd_->current->files_[level]) {
for (const auto& f : default_cfd_->current()->files_[level]) {
edit.DeleteFile(level, f->number);
edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
@ -1361,14 +1360,14 @@ Status DBImpl::ReFitLevel(int level, int target_level) {
status = versions_->LogAndApply(default_cfd_, &edit, &mutex_,
db_directory_.get());
superversion_to_free = InstallSuperVersion(default_cfd_, new_superversion);
superversion_to_free = default_cfd_->InstallSuperVersion(new_superversion);
new_superversion = nullptr;
Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data());
if (status.ok()) {
Log(options_.info_log, "After refitting:\n%s",
default_cfd_->current->DebugString().data());
default_cfd_->current()->DebugString().data());
}
}
@ -1394,7 +1393,7 @@ int DBImpl::Level0StopWriteTrigger(const ColumnFamilyHandle& column_family) {
}
uint64_t DBImpl::CurrentVersionNumber() const {
return super_version_number_.load();
return default_cfd_->GetSuperVersionNumber();
}
Status DBImpl::Flush(const FlushOptions& options,
@ -1688,10 +1687,10 @@ Status DBImpl::WaitForFlushMemTable() {
Status s;
// Wait until the compaction completes
MutexLock l(&mutex_);
while (default_cfd_->imm.size() > 0 && bg_error_.ok()) {
while (default_cfd_->imm()->size() > 0 && bg_error_.ok()) {
bg_cv_.Wait();
}
if (default_cfd_->imm.size() != 0) {
if (default_cfd_->imm()->size() != 0) {
s = bg_error_;
}
return s;
@ -1727,7 +1726,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
} else {
bool is_flush_pending = default_cfd_->imm.IsFlushPending();
bool is_flush_pending = default_cfd_->imm()->IsFlushPending();
if (is_flush_pending &&
(bg_flush_scheduled_ < options_.max_background_flushes)) {
// memtable flush needed
@ -1739,7 +1738,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
// flush, but the HIGH pool is not enabled). Do it only if
// max_background_compactions hasn't been reached and, in case
// bg_manual_only_ > 0, if it's a manual compaction.
if ((manual_compaction_ || default_cfd_->current->NeedsCompaction() ||
if ((manual_compaction_ || default_cfd_->current()->NeedsCompaction() ||
(is_flush_pending && (options_.max_background_flushes <= 0))) &&
bg_compaction_scheduled_ < options_.max_background_compactions &&
(!bg_manual_only_ || manual_compaction_)) {
@ -1761,7 +1760,7 @@ void DBImpl::BGWorkCompaction(void* db) {
Status DBImpl::BackgroundFlush(bool* madeProgress,
DeletionState& deletion_state) {
Status stat;
while (stat.ok() && default_cfd_->imm.IsFlushPending()) {
while (stat.ok() && default_cfd_->imm()->IsFlushPending()) {
Log(options_.info_log,
"BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d",
options_.max_background_flushes - bg_flush_scheduled_);
@ -1818,7 +1817,7 @@ void DBImpl::TEST_PurgeObsoleteteWAL() {
uint64_t DBImpl::TEST_GetLevel0TotalSize() {
MutexLock l(&mutex_);
return default_cfd_->current->NumLevelBytes(0);
return default_cfd_->current()->NumLevelBytes(0);
}
void DBImpl::BackgroundCallCompaction() {
@ -1881,7 +1880,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
mutex_.AssertHeld();
// TODO: remove memtable flush from formal compaction
while (default_cfd_->imm.IsFlushPending()) {
while (default_cfd_->imm()->IsFlushPending()) {
Log(options_.info_log,
"BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots "
"available %d",
@ -1940,7 +1939,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number), c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(), default_cfd_->current->LevelSummary(&tmp));
status.ToString().c_str(), default_cfd_->current()->LevelSummary(&tmp));
versions_->ReleaseCompactionFiles(c.get(), status);
*madeProgress = true;
} else {
@ -2287,11 +2286,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
// Prioritize immutable compaction work
// TODO: remove memtable flush from normal compaction work
if (default_cfd_->imm.imm_flush_needed.NoBarrier_Load() != nullptr) {
if (default_cfd_->imm()->imm_flush_needed.NoBarrier_Load() != nullptr) {
const uint64_t imm_start = env_->NowMicros();
LogFlush(options_.info_log);
mutex_.Lock();
if (default_cfd_->imm.IsFlushPending()) {
if (default_cfd_->imm()->IsFlushPending()) {
FlushMemTableToOutputFile(nullptr, deletion_state);
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
@ -2669,11 +2668,11 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
// Collect together all needed child iterators for mem
mutex_.Lock();
*latest_snapshot = versions_->LastSequence();
mutable_mem = default_cfd_->mem;
mutable_mem = default_cfd_->mem();
mutable_mem->Ref();
immutable_mems = default_cfd_->imm.current();
immutable_mems = default_cfd_->imm()->current();
immutable_mems->Ref();
version = default_cfd_->current;
version = default_cfd_->current();
version->Ref();
mutex_.Unlock();
@ -2710,11 +2709,11 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
// get all child iterators and bump their refcounts under lock
mutex_.Lock();
mutable_mem = default_cfd_->mem;
mutable_mem = default_cfd_->mem();
mutable_mem->Ref();
immutable_mems = default_cfd_->imm.current();
immutable_mems = default_cfd_->imm()->current();
immutable_mems->Ref();
version = default_cfd_->current;
version = default_cfd_->current();
version->Ref();
if (superversion_number != nullptr) {
*superversion_number = CurrentVersionNumber();
@ -2756,7 +2755,7 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
MutexLock l(&mutex_);
return default_cfd_->current->MaxNextLevelOverlappingBytes();
return default_cfd_->current()->MaxNextLevelOverlappingBytes();
}
Status DBImpl::Get(const ReadOptions& options,
@ -2777,11 +2776,12 @@ Status DBImpl::Get(const ReadOptions& options,
// for superversion_to_free
void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd,
DeletionState& deletion_state) {
mutex_.AssertHeld();
// if new_superversion == nullptr, it means somebody already used it
SuperVersion* new_superversion =
(deletion_state.new_superversion != nullptr) ?
deletion_state.new_superversion : new SuperVersion();
SuperVersion* old_superversion = InstallSuperVersion(cfd, new_superversion);
SuperVersion* old_superversion = cfd->InstallSuperVersion(new_superversion);
deletion_state.new_superversion = nullptr;
if (deletion_state.superversion_to_free != nullptr) {
// somebody already put it there
@ -2791,23 +2791,6 @@ void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd,
}
}
SuperVersion* DBImpl::InstallSuperVersion(ColumnFamilyData* cfd,
SuperVersion* new_superversion) {
mutex_.AssertHeld();
new_superversion->Init(cfd->mem, cfd->imm.current(), cfd->current);
SuperVersion* old_superversion = cfd->super_version;
cfd->super_version = new_superversion;
if (cfd->id == 0) {
// TODO this is only for default column family for now
++super_version_number_;
}
if (old_superversion != nullptr && old_superversion->Unref()) {
old_superversion->Cleanup();
return old_superversion; // will let caller delete outside of mutex
}
return nullptr;
}
Status DBImpl::GetImpl(const ReadOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key, std::string* value,
@ -2819,7 +2802,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// this is asserting because client calling Get() with undefined
// ColumnFamilyHandle is undefined behavior.
assert(cfd != nullptr);
SuperVersion* get_version = cfd->super_version->Ref();
SuperVersion* get_version = cfd->GetSuperVersion()->Ref();
mutex_.Unlock();
SequenceNumber snapshot;
@ -2899,9 +2882,9 @@ std::vector<Status> DBImpl::MultiGet(
}
// TODO only works for default column family
MemTable* mem = default_cfd_->mem;
MemTableListVersion* imm = default_cfd_->imm.current();
Version* current = default_cfd_->current;
MemTable* mem = default_cfd_->mem();
MemTableListVersion* imm = default_cfd_->imm()->current();
Version* current = default_cfd_->current();
mem->Ref();
imm->Ref();
current->Ref();
@ -3340,7 +3323,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
uint64_t slowdown =
SlowdownAmount(default_cfd_->current->NumLevelFiles(0),
SlowdownAmount(default_cfd_->current()->NumLevelFiles(0),
options_.level0_slowdown_writes_trigger,
options_.level0_stop_writes_trigger);
mutex_.Unlock();
@ -3356,14 +3339,14 @@ Status DBImpl::MakeRoomForWrite(bool force,
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
delayed_writes_++;
} else if (!force && (default_cfd_->mem->ApproximateMemoryUsage() <=
} else if (!force && (default_cfd_->mem()->ApproximateMemoryUsage() <=
options_.write_buffer_size)) {
// There is room in current memtable
if (allow_delay) {
DelayLoggingAndReset();
}
break;
} else if (default_cfd_->imm.size() ==
} else if (default_cfd_->imm()->size() ==
options_.max_write_buffer_number - 1) {
// We have filled up the current memtable, but the previous
// ones are still being compacted, so we wait.
@ -3380,7 +3363,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
STALL_MEMTABLE_COMPACTION_MICROS, stall);
stall_memtable_compaction_ += stall;
stall_memtable_compaction_count_++;
} else if (default_cfd_->current->NumLevelFiles(0) >=
} else if (default_cfd_->current()->NumLevelFiles(0) >=
options_.level0_stop_writes_trigger) {
// There are too many level-0 files.
DelayLoggingAndReset();
@ -3396,10 +3379,10 @@ Status DBImpl::MakeRoomForWrite(bool force,
stall_level0_num_files_ += stall;
stall_level0_num_files_count_++;
} else if (allow_hard_rate_limit_delay && options_.hard_rate_limit > 1.0 &&
(score = default_cfd_->current->MaxCompactionScore()) >
(score = default_cfd_->current()->MaxCompactionScore()) >
options_.hard_rate_limit) {
// Delay a write when the compaction score for any level is too large.
int max_level = default_cfd_->current->MaxCompactionScoreLevel();
int max_level = default_cfd_->current()->MaxCompactionScoreLevel();
mutex_.Unlock();
uint64_t delayed;
{
@ -3422,7 +3405,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
}
mutex_.Lock();
} else if (allow_soft_rate_limit_delay && options_.soft_rate_limit > 0.0 &&
(score = default_cfd_->current->MaxCompactionScore()) >
(score = default_cfd_->current()->MaxCompactionScore()) >
options_.soft_rate_limit) {
// Delay a write when the compaction score for any level is too large.
// TODO: add statistics
@ -3473,21 +3456,20 @@ Status DBImpl::MakeRoomForWrite(bool force,
}
logfile_number_ = new_log_number;
log_.reset(new log::Writer(std::move(lfile)));
default_cfd_->mem->SetNextLogNumber(logfile_number_);
default_cfd_->imm.Add(default_cfd_->mem);
default_cfd_->mem()->SetNextLogNumber(logfile_number_);
default_cfd_->imm()->Add(default_cfd_->mem());
if (force) {
default_cfd_->imm.FlushRequested();
default_cfd_->imm()->FlushRequested();
}
default_cfd_->mem = memtmp;
default_cfd_->mem->Ref();
Log(options_.info_log,
"New memtable created with log file: #%lu\n",
memtmp->Ref();
memtmp->SetLogNumber(logfile_number_);
default_cfd_->SetMemtable(memtmp);
Log(options_.info_log, "New memtable created with log file: #%lu\n",
(unsigned long)logfile_number_);
default_cfd_->mem->SetLogNumber(logfile_number_);
force = false; // Do not force another compaction if have room
MaybeScheduleFlushOrCompaction();
*superversion_to_free =
InstallSuperVersion(default_cfd_, new_superversion);
default_cfd_->InstallSuperVersion(new_superversion);
}
}
return s;
@ -3511,7 +3493,7 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
value->clear();
MutexLock l(&mutex_);
Version* current = default_cfd_->current;
Version* current = default_cfd_->current();
Slice in = property;
Slice prefix("rocksdb.");
if (!in.starts_with(prefix)) return false;
@ -3792,10 +3774,10 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
return true;
} else if (in == "sstables") {
*value = default_cfd_->current->DebugString();
*value = default_cfd_->current()->DebugString();
return true;
} else if (in == "num-immutable-mem-table") {
*value = std::to_string(default_cfd_->imm.size());
*value = std::to_string(default_cfd_->imm()->size());
return true;
}
@ -3808,7 +3790,7 @@ void DBImpl::GetApproximateSizes(const ColumnFamilyHandle& column_family,
Version* v;
{
MutexLock l(&mutex_);
v = default_cfd_->current;
v = default_cfd_->current();
v->Ref();
}
@ -3885,7 +3867,7 @@ Status DBImpl::DeleteFile(std::string name) {
// This is to make sure that any deletion tombstones are not
// lost. Check that the level passed is the last level.
for (int i = level + 1; i < maxlevel; i++) {
if (cfd->current->NumLevelFiles(i) != 0) {
if (cfd->current()->NumLevelFiles(i) != 0) {
Log(options_.info_log,
"DeleteFile %s FAILED. File not in last level\n", name.c_str());
return Status::InvalidArgument("File not in last level");
@ -4060,8 +4042,8 @@ Status DB::OpenWithColumnFamilies(
}
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
delete impl->InstallSuperVersion(cfd, new SuperVersion());
cfd->mem->SetLogNumber(impl->logfile_number_);
delete cfd->InstallSuperVersion(new SuperVersion());
cfd->mem()->SetLogNumber(impl->logfile_number_);
}
impl->DeleteObsoleteFiles();
impl->MaybeScheduleFlushOrCompaction();
@ -4071,7 +4053,7 @@ Status DB::OpenWithColumnFamilies(
}
if (s.ok() && impl->options_.compaction_style == kCompactionStyleUniversal) {
Version* current = impl->default_cfd_->current;
Version* current = impl->default_cfd_->current();
for (int i = 1; i < impl->NumberLevels(); i++) {
int num_files = current->NumLevelFiles(i);
if (num_files > 0) {

@ -401,11 +401,6 @@ class DBImpl : public DB {
ColumnFamilyData* default_cfd_;
unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
// An ordinal representing the current SuperVersion. Updated by
// InstallSuperVersion(), i.e. incremented every time super_version_
// changes.
std::atomic<uint64_t> super_version_number_;
std::string host_name_;
std::unique_ptr<Directory> db_directory_;
@ -587,16 +582,8 @@ class DBImpl : public DB {
std::vector<SequenceNumber>& snapshots,
SequenceNumber* prev_snapshot);
// will return a pointer to SuperVersion* if previous SuperVersion
// if its reference count is zero and needs deletion or nullptr if not
// As argument takes a pointer to allocated SuperVersion
// Foreground threads call this function directly (they don't carry
// deletion state and have to handle their own creation and deletion
// of SuperVersion)
SuperVersion* InstallSuperVersion(ColumnFamilyData* cfd,
SuperVersion* new_superversion);
// Background threads call this function, which is just a wrapper around
// the InstallSuperVersion() function above. Background threads carry
// the cfd->InstallSuperVersion() function. Background threads carry
// deletion_state which can have new_superversion already allocated.
void InstallSuperVersion(ColumnFamilyData* cfd,
DeletionState& deletion_state);

@ -56,8 +56,8 @@ Status DBImplReadOnly::Get(const ReadOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key, std::string* value) {
Status s;
MemTable* mem = GetDefaultColumnFamily()->mem;
Version* current = GetDefaultColumnFamily()->current;
MemTable* mem = GetDefaultColumnFamily()->mem();
Version* current = GetDefaultColumnFamily()->current();
SequenceNumber snapshot = versions_->LastSequence();
MergeContext merge_context;
LookupKey lkey(key, snapshot);

@ -65,7 +65,7 @@ void DBImpl::LogDBDeployStats() {
uint64_t file_total_size = 0;
uint32_t file_total_num = 0;
Version* current = default_cfd_->current;
Version* current = default_cfd_->current();
for (int i = 0; i < current->NumberLevels(); i++) {
file_total_num += current->NumLevelFiles(i);
file_total_size += current->NumLevelBytes(i);

@ -1390,7 +1390,7 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options,
VersionSet::~VersionSet() {
for (auto cfd : *column_family_set_) {
cfd->current->Unref();
cfd->current()->Unref();
}
// we need to delete column_family_set_ because its destructor depends on
// VersionSet
@ -1405,20 +1405,21 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
Version* v) {
// Make "v" current
assert(v->refs_ == 0);
assert(v != column_family_data->current);
if (column_family_data->current != nullptr) {
assert(column_family_data->current->refs_ > 0);
column_family_data->current->Unref();
Version* current = column_family_data->current();
assert(v != current);
if (current != nullptr) {
assert(current->refs_ > 0);
current->Unref();
}
column_family_data->current = v;
column_family_data->SetCurrent(v);
need_slowdown_for_num_level0_files_ =
(options_->level0_slowdown_writes_trigger >= 0 &&
v->NumLevelFiles(0) >= options_->level0_slowdown_writes_trigger);
v->Ref();
// Append to linked list
v->prev_ = column_family_data->dummy_versions->prev_;
v->next_ = column_family_data->dummy_versions;
v->prev_ = column_family_data->dummy_versions()->prev_;
v->next_ = column_family_data->dummy_versions();
v->prev_->next_ = v;
v->next_->prev_ = v;
}
@ -1441,7 +1442,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
std::vector<VersionEdit*> batch_edits;
Version* v = new Version(this, current_version_number_++);
Builder builder(this, column_family_data->current);
Builder builder(this, column_family_data->current());
// process all requests in the queue
ManifestWriter* last_writer = &w;
@ -1567,7 +1568,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
manifest_file_size_ = new_manifest_file_size;
AppendVersion(column_family_data, v);
log_number_ = edit->log_number_;
column_family_data->log_number = edit->log_number_;
column_family_data->SetLogNumber(edit->log_number_);
prev_log_number_ = edit->prev_log_number_;
} else {
@ -1676,7 +1677,7 @@ Status VersionSet::Recover(
} else {
ColumnFamilyData* default_cfd =
CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
builders.insert({0, new Builder(this, default_cfd->current)});
builders.insert({0, new Builder(this, default_cfd->current())});
}
{
@ -1722,7 +1723,7 @@ Status VersionSet::Recover(
ColumnFamilyData* new_cfd =
CreateColumnFamily(cf_options->second, &edit);
builders.insert(
{edit.column_family_, new Builder(this, new_cfd->current)});
{edit.column_family_, new Builder(this, new_cfd->current())});
}
} else if (edit.is_column_family_drop_) {
if (cf_in_builders) {
@ -1748,14 +1749,14 @@ Status VersionSet::Recover(
auto cfd = column_family_set_->GetColumnFamily(edit.column_family_);
// this should never happen since cf_in_builders is true
assert(cfd != nullptr);
if (edit.max_level_ >= cfd->current->NumberLevels()) {
if (edit.max_level_ >= cfd->current()->NumberLevels()) {
s = Status::InvalidArgument(
"db has more levels than options.num_levels");
break;
}
if (edit.has_log_number_) {
cfd->log_number = edit.log_number_;
cfd->SetLogNumber(edit.log_number_);
}
// if it is not column family add or column family drop,
@ -1817,7 +1818,7 @@ Status VersionSet::Recover(
if (s.ok()) {
for (auto cfd : *column_family_set_) {
Version* v = new Version(this, current_version_number_++);
builders[cfd->id]->SaveTo(v);
builders[cfd->GetID()]->SaveTo(v);
// Install recovered version
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
@ -1846,7 +1847,7 @@ Status VersionSet::Recover(
for (auto cfd : *column_family_set_) {
Log(options_->info_log, "Column family \"%s\", log number is %lu\n",
cfd->name.c_str(), cfd->log_number);
cfd->GetName().c_str(), cfd->GetLogNumber());
}
}
@ -1936,7 +1937,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
}
Version* current_version =
versions.GetColumnFamilySet()->GetDefault()->current;
versions.GetColumnFamilySet()->GetDefault()->current();
int current_levels = current_version->NumberLevels();
if (current_levels <= new_levels) {
@ -2009,7 +2010,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
uint64_t prev_log_number = 0;
int count = 0;
// TODO works only for default column family currently
VersionSet::Builder builder(this, column_family_set_->GetDefault()->current);
VersionSet::Builder builder(this,
column_family_set_->GetDefault()->current());
{
VersionSet::LogReporter reporter;
@ -2120,11 +2122,11 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
{
// Store column family info
VersionEdit edit;
if (cfd->id != 0) {
if (cfd->GetID() != 0) {
// default column family is always there,
// no need to explicitly write it
edit.AddColumnFamily(cfd->name);
edit.SetColumnFamily(cfd->id);
edit.AddColumnFamily(cfd->GetName());
edit.SetColumnFamily(cfd->GetID());
std::string record;
edit.EncodeTo(&record);
Status s = log->AddRecord(record);
@ -2137,10 +2139,10 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
{
// Save files
VersionEdit edit;
edit.SetColumnFamily(cfd->id);
edit.SetColumnFamily(cfd->GetID());
for (int level = 0; level < NumberLevels(); level++) {
for (const auto& f : cfd->current->files_[level]) {
for (const auto& f : cfd->current()->files_[level]) {
edit.AddFile(level,
f->number,
f->file_size,
@ -2150,7 +2152,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
f->largest_seqno);
}
}
edit.SetLogNumber(cfd->log_number);
edit.SetLogNumber(cfd->GetLogNumber());
std::string record;
edit.EncodeTo(&record);
Status s = log->AddRecord(record);
@ -2235,7 +2237,8 @@ void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_list) {
// pre-calculate space requirement
int64_t total_files = 0;
for (auto cfd : *column_family_set_) {
for (Version* v = cfd->dummy_versions->next_; v != cfd->dummy_versions;
Version* dummy_versions = cfd->dummy_versions();
for (Version* v = dummy_versions->next_; v != dummy_versions;
v = v->next_) {
for (int level = 0; level < v->NumberLevels(); level++) {
total_files += v->files_[level].size();
@ -2247,7 +2250,8 @@ void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_list) {
live_list->reserve(live_list->size() + total_files);
for (auto cfd : *column_family_set_) {
for (Version* v = cfd->dummy_versions->next_; v != cfd->dummy_versions;
Version* dummy_versions = cfd->dummy_versions();
for (Version* v = dummy_versions->next_; v != dummy_versions;
v = v->next_) {
for (int level = 0; level < v->NumberLevels(); level++) {
for (const auto& f : v->files_[level]) {
@ -2260,7 +2264,7 @@ void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_list) {
Compaction* VersionSet::PickCompaction() {
// TODO this only works for default column family now
Version* version = column_family_set_->GetDefault()->current;
Version* version = column_family_set_->GetDefault()->current();
return compaction_picker_->PickCompaction(version);
}
@ -2269,7 +2273,7 @@ Compaction* VersionSet::CompactRange(int input_level, int output_level,
const InternalKey* end,
InternalKey** compaction_end) {
// TODO this only works for default column family now
Version* version = column_family_set_->GetDefault()->current;
Version* version = column_family_set_->GetDefault()->current();
return compaction_picker_->CompactRange(version, input_level, output_level,
begin, end, compaction_end);
}
@ -2321,7 +2325,7 @@ uint64_t VersionSet::MaxFileSizeForLevel(int level) {
bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
#ifndef NDEBUG
// TODO this only works for default column family now
Version* version = column_family_set_->GetDefault()->current;
Version* version = column_family_set_->GetDefault()->current();
if (c->input_version() != version) {
Log(options_->info_log, "VerifyCompactionFileConsistency version mismatch");
}
@ -2374,7 +2378,7 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
FileMetaData* meta,
ColumnFamilyData** cfd) {
for (auto cfd_iter : *column_family_set_) {
Version* version = cfd_iter->current;
Version* version = cfd_iter->current();
for (int level = 0; level < version->NumberLevels(); level++) {
for (const auto& file : version->files_[level]) {
if (file->number == number) {
@ -2392,7 +2396,7 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
for (auto cfd : *column_family_set_) {
for (int level = 0; level < NumberLevels(); level++) {
for (const auto& file : cfd->current->files_[level]) {
for (const auto& file : cfd->current()->files_[level]) {
LiveFileMetaData filemetadata;
filemetadata.name = TableFileName("", file->number);
filemetadata.level = level;

@ -363,8 +363,8 @@ class VersionSet {
uint64_t MinLogNumber() const {
uint64_t min_log_num = 0;
for (auto cfd : *column_family_set_) {
if (min_log_num == 0 || min_log_num > cfd->log_number) {
min_log_num = cfd->log_number;
if (min_log_num == 0 || min_log_num > cfd->GetLogNumber()) {
min_log_num = cfd->GetLogNumber();
}
}
return min_log_num;
@ -448,7 +448,7 @@ class VersionSet {
friend class Compaction;
friend class Version;
// TODO(icanadi) temporarily until we have what ColumnFamilyData needs (icmp_)
friend struct ColumnFamilyData;
friend class ColumnFamilyData;
struct LogReporter : public log::Reader::Reporter {
Status* status;

@ -1028,7 +1028,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt,
int max = -1;
auto default_cfd = versions.GetColumnFamilySet()->GetDefault();
for (int i = 0; i < versions.NumberLevels(); i++) {
if (default_cfd->current->NumLevelFiles(i)) {
if (default_cfd->current()->NumLevelFiles(i)) {
max = i;
}
}

Loading…
Cancel
Save