[column families] Move memtable and immutable memtable list to column family data

Summary: All memtables and immutable memtables are moved from DBImpl to ColumnFamilyData. For now, they are all referenced from default column family in DBImpl. It shouldn't be hard to get them from custom column family.

Test Plan: make check

Reviewers: dhruba, kailiu, sdong

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15459
main
Igor Canadi 11 years ago
parent ae16606f98
commit eb055609e4
  1. 88
      db/column_family.cc
  2. 70
      db/column_family.h
  3. 209
      db/db_impl.cc
  4. 44
      db/db_impl.h
  5. 3
      db/db_impl_readonly.cc
  6. 3
      db/memtable.cc
  7. 4
      db/memtable.h
  8. 1
      db/memtablelist.cc
  9. 2
      db/memtablelist.h
  10. 4
      db/version_set.cc
  11. 2
      db/version_set.h
  12. 2
      db/write_batch_test.cc
  13. 3
      include/rocksdb/db.h
  14. 30
      include/rocksdb/options.h
  15. 3
      table/table_test.cc
  16. 4
      util/options.cc

@ -1,8 +1,64 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/column_family.h"
#include <vector>
#include <string>
#include <algorithm>
#include "db/version_set.h"
namespace rocksdb {
SuperVersion::SuperVersion(const int num_memtables) {
to_delete.resize(num_memtables);
}
SuperVersion::~SuperVersion() {
for (auto td : to_delete) {
delete td;
}
}
SuperVersion* SuperVersion::Ref() {
refs.fetch_add(1, std::memory_order_relaxed);
return this;
}
bool SuperVersion::Unref() {
assert(refs > 0);
// fetch_sub returns the previous value of ref
return refs.fetch_sub(1, std::memory_order_relaxed) == 1;
}
void SuperVersion::Cleanup() {
assert(refs.load(std::memory_order_relaxed) == 0);
imm->Unref(&to_delete);
MemTable* m = mem->Unref();
if (m != nullptr) {
to_delete.push_back(m);
}
current->Unref();
}
void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current) {
mem = new_mem;
imm = new_imm;
current = new_current;
mem->Ref();
imm->Ref();
current->Ref();
refs.store(1, std::memory_order_relaxed);
}
ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions,
const ColumnFamilyOptions& options)
@ -10,12 +66,40 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
name(name),
dummy_versions(dummy_versions),
current(nullptr),
options(options) {}
options(options),
mem(nullptr),
imm(options.min_write_buffer_number_to_merge),
super_version(nullptr) {}
ColumnFamilyData::~ColumnFamilyData() {
if (super_version != nullptr) {
bool is_last_reference __attribute__((unused));
is_last_reference = super_version->Unref();
assert(is_last_reference);
super_version->Cleanup();
delete super_version;
}
// List must be empty
assert(dummy_versions->next_ == dummy_versions);
delete dummy_versions;
if (mem != nullptr) {
delete mem->Unref();
}
std::vector<MemTable*> to_delete;
imm.current()->Unref(&to_delete);
for (MemTable* m : to_delete) {
delete m;
}
}
void ColumnFamilyData::CreateNewMemtable() {
assert(current != nullptr);
if (mem != nullptr) {
delete mem->Unref();
}
mem = new MemTable(current->vset_->icmp_, options);
mem->Ref();
}
ColumnFamilySet::ColumnFamilySet() : max_column_family_(0) {}
@ -31,7 +115,7 @@ ColumnFamilySet::~ColumnFamilySet() {
ColumnFamilyData* ColumnFamilySet::GetDefault() const {
auto ret = GetColumnFamily(0);
assert(ret != nullptr); // default column family should always exist
assert(ret != nullptr); // default column family should always exist
return ret;
}

@ -9,16 +9,47 @@
#pragma once
#include "rocksdb/options.h"
#include <unordered_map>
#include <string>
#include <vector>
#include "rocksdb/options.h"
#include "db/memtablelist.h"
namespace rocksdb {
class Version;
class VersionSet;
class MemTable;
class MemTableListVersion;
// holds references to memtable, all immutable memtables and version
struct SuperVersion {
MemTable* mem;
MemTableListVersion* imm;
Version* current;
std::atomic<uint32_t> refs;
// We need to_delete because during Cleanup(), imm->Unref() returns
// all memtables that we need to free through this vector. We then
// delete all those memtables outside of mutex, during destruction
std::vector<MemTable*> to_delete;
// should be called outside the mutex
explicit SuperVersion(const int num_memtables = 0);
~SuperVersion();
SuperVersion* Ref();
// Returns true if this was the last reference and caller should
// call Clenaup() and delete the object
bool Unref();
// call these two methods with db mutex held
// Cleanup unrefs mem, imm and current. Also, it stores all memtables
// that needs to be deleted in to_delete vector. Unrefing those
// objects needs to be done in the mutex
void Cleanup();
void Init(MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current);
};
// column family metadata
struct ColumnFamilyData {
@ -28,27 +59,34 @@ struct ColumnFamilyData {
Version* current; // == dummy_versions->prev_
ColumnFamilyOptions options;
MemTable* mem;
MemTableList imm;
SuperVersion* super_version;
ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions, const ColumnFamilyOptions& options);
~ColumnFamilyData();
void CreateNewMemtable();
};
class ColumnFamilySet {
public:
class iterator {
public:
explicit iterator(
std::unordered_map<uint32_t, ColumnFamilyData*>::iterator itr)
: itr_(itr) {}
iterator& operator++() {
++itr_;
return *this;
}
bool operator!=(const iterator& other) { return this->itr_ != other.itr_; }
ColumnFamilyData* operator*() { return itr_->second; }
private:
std::unordered_map<uint32_t, ColumnFamilyData*>::iterator itr_;
};
class iterator {
public:
explicit iterator(
std::unordered_map<uint32_t, ColumnFamilyData*>::iterator itr)
: itr_(itr) {}
iterator& operator++() {
++itr_;
return *this;
}
bool operator!=(const iterator& other) { return this->itr_ != other.itr_; }
ColumnFamilyData* operator*() { return itr_->second; }
private:
std::unordered_map<uint32_t, ColumnFamilyData*>::iterator itr_;
};
ColumnFamilySet();
~ColumnFamilySet();

@ -265,11 +265,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
mutex_(options.use_adaptive_mutex),
shutting_down_(nullptr),
bg_cv_(&mutex_),
mem_rep_factory_(options_.memtable_factory.get()),
mem_(new MemTable(internal_comparator_, options_)),
imm_(options_.min_write_buffer_number_to_merge),
logfile_number_(0),
super_version_(nullptr),
super_version_number_(0),
tmp_batch_(),
bg_compaction_scheduled_(0),
@ -297,8 +293,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
bg_work_gate_closed_(false),
refitting_level_(false) {
mem_->Ref();
env_->GetAbsolutePath(dbname, &db_absolute_path_);
stall_leveln_slowdown_.resize(options.num_levels);
@ -333,11 +327,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
}
DBImpl::~DBImpl() {
std::vector<MemTable*> to_delete;
to_delete.reserve(options_.max_write_buffer_number);
// Wait for background work to finish
if (flush_on_destroy_ && mem_->GetFirstSequenceNumber() != 0) {
if (flush_on_destroy_ && default_cfd_->mem->GetFirstSequenceNumber() != 0) {
FlushMemTable(FlushOptions());
}
mutex_.Lock();
@ -347,27 +338,12 @@ DBImpl::~DBImpl() {
bg_logstats_scheduled_) {
bg_cv_.Wait();
}
if (super_version_ != nullptr) {
bool is_last_reference __attribute__((unused));
is_last_reference = super_version_->Unref();
assert(is_last_reference);
super_version_->Cleanup();
delete super_version_;
}
mutex_.Unlock();
if (db_lock_ != nullptr) {
env_->UnlockFile(db_lock_);
}
if (mem_ != nullptr) {
delete mem_->Unref();
}
imm_.current()->Unref(&to_delete);
for (MemTable* m: to_delete) {
delete m;
}
LogFlush(options_.info_log);
}
@ -383,13 +359,6 @@ void DBImpl::TEST_Destroy_DBImpl() {
bg_logstats_scheduled_) {
bg_cv_.Wait();
}
if (super_version_ != nullptr) {
bool is_last_reference __attribute__((unused));
is_last_reference = super_version_->Unref();
assert(is_last_reference);
super_version_->Cleanup();
delete super_version_;
}
// Prevent new compactions from occuring.
bg_work_gate_closed_ = true;
@ -488,49 +457,6 @@ void DBImpl::MaybeDumpStats() {
}
}
// DBImpl::SuperVersion methods
DBImpl::SuperVersion::SuperVersion(const int num_memtables) {
to_delete.resize(num_memtables);
}
DBImpl::SuperVersion::~SuperVersion() {
for (auto td : to_delete) {
delete td;
}
}
DBImpl::SuperVersion* DBImpl::SuperVersion::Ref() {
refs.fetch_add(1, std::memory_order_relaxed);
return this;
}
bool DBImpl::SuperVersion::Unref() {
assert(refs > 0);
// fetch_sub returns the previous value of ref
return refs.fetch_sub(1, std::memory_order_relaxed) == 1;
}
void DBImpl::SuperVersion::Cleanup() {
assert(refs.load(std::memory_order_relaxed) == 0);
imm->Unref(&to_delete);
MemTable* m = mem->Unref();
if (m != nullptr) {
to_delete.push_back(m);
}
current->Unref();
}
void DBImpl::SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current) {
mem = new_mem;
imm = new_imm;
current = new_current;
mem->Ref();
imm->Ref();
current->Ref();
refs.store(1, std::memory_order_relaxed);
}
// Returns the list of live files in 'sst_live' and the list
// of all files in the filesystem in 'all_files'.
// no_full_scan = true -- never do the full scan using GetChildren()
@ -925,6 +851,7 @@ Status DBImpl::Recover(
Status s = versions_->Recover(column_families);
if (s.ok()) {
SequenceNumber max_sequence(0);
default_cfd_ = versions_->GetColumnFamilySet()->GetDefault();
// Recover from all newer log files than the ones named in the
// descriptor (new log files may have been added by the previous
@ -1037,7 +964,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
}
WriteBatchInternal::SetContents(&batch, record);
status = WriteBatchInternal::InsertInto(&batch, mem_, &options_);
status =
WriteBatchInternal::InsertInto(&batch, default_cfd_->mem, &options_);
memtable_empty = false;
MaybeIgnoreError(&status);
if (!status.ok()) {
@ -1050,13 +978,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
*max_sequence = last_seq;
}
if (!read_only &&
mem_->ApproximateMemoryUsage() > options_.write_buffer_size) {
status = WriteLevel0TableForRecovery(mem_, &edit);
if (!read_only && default_cfd_->mem->ApproximateMemoryUsage() >
options_.write_buffer_size) {
status = WriteLevel0TableForRecovery(default_cfd_->mem, &edit);
// we still want to clear memtable, even if the recovery failed
delete mem_->Unref();
mem_ = new MemTable(internal_comparator_, options_);
mem_->Ref();
default_cfd_->CreateNewMemtable();
memtable_empty = true;
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
@ -1067,10 +993,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
}
if (!memtable_empty && !read_only) {
status = WriteLevel0TableForRecovery(mem_, &edit);
delete mem_->Unref();
mem_ = new MemTable(internal_comparator_, options_);
mem_->Ref();
status = WriteLevel0TableForRecovery(default_cfd_->mem, &edit);
default_cfd_->CreateNewMemtable();
if (!status.ok()) {
return status;
}
@ -1233,9 +1157,9 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
DeletionState& deletion_state) {
mutex_.AssertHeld();
assert(imm_.size() != 0);
assert(default_cfd_->imm.size() != 0);
if (!imm_.IsFlushPending()) {
if (!default_cfd_->imm.IsFlushPending()) {
Log(options_.info_log, "FlushMemTableToOutputFile already in progress");
Status s = Status::IOError("FlushMemTableToOutputFile already in progress");
return s;
@ -1244,7 +1168,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
// Save the contents of the earliest memtable as a new Table
uint64_t file_number;
std::vector<MemTable*> mems;
imm_.PickMemtablesToFlush(&mems);
default_cfd_->imm.PickMemtablesToFlush(&mems);
if (mems.empty()) {
Log(options_.info_log, "Nothing in memstore to flush");
Status s = Status::IOError("Nothing in memstore to flush");
@ -1279,12 +1203,12 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
}
// Replace immutable memtable with the generated Table
s = imm_.InstallMemtableFlushResults(
s = default_cfd_->imm.InstallMemtableFlushResults(
mems, versions_.get(), s, &mutex_, options_.info_log.get(), file_number,
pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get());
if (s.ok()) {
InstallSuperVersion(deletion_state);
InstallSuperVersion(default_cfd_, deletion_state);
if (madeProgress) {
*madeProgress = 1;
}
@ -1410,7 +1334,7 @@ Status DBImpl::ReFitLevel(int level, int target_level) {
edit.DebugString().data());
status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get());
superversion_to_free = InstallSuperVersion(new_superversion);
superversion_to_free = InstallSuperVersion(default_cfd_, new_superversion);
new_superversion = nullptr;
Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data());
@ -1737,10 +1661,10 @@ Status DBImpl::WaitForFlushMemTable() {
Status s;
// Wait until the compaction completes
MutexLock l(&mutex_);
while (imm_.size() > 0 && bg_error_.ok()) {
while (default_cfd_->imm.size() > 0 && bg_error_.ok()) {
bg_cv_.Wait();
}
if (imm_.size() != 0) {
if (default_cfd_->imm.size() != 0) {
s = bg_error_;
}
return s;
@ -1776,7 +1700,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
} else {
bool is_flush_pending = imm_.IsFlushPending();
bool is_flush_pending = default_cfd_->imm.IsFlushPending();
if (is_flush_pending &&
(bg_flush_scheduled_ < options_.max_background_flushes)) {
// memtable flush needed
@ -1811,7 +1735,7 @@ void DBImpl::BGWorkCompaction(void* db) {
Status DBImpl::BackgroundFlush(bool* madeProgress,
DeletionState& deletion_state) {
Status stat;
while (stat.ok() && imm_.IsFlushPending()) {
while (stat.ok() && default_cfd_->imm.IsFlushPending()) {
Log(options_.info_log,
"BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d",
options_.max_background_flushes - bg_flush_scheduled_);
@ -1931,7 +1855,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
mutex_.AssertHeld();
// TODO: remove memtable flush from formal compaction
while (imm_.IsFlushPending()) {
while (default_cfd_->imm.IsFlushPending()) {
Log(options_.info_log,
"BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots "
"available %d",
@ -1983,7 +1907,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
status = versions_->LogAndApply(c->edit(), &mutex_, db_directory_.get());
InstallSuperVersion(deletion_state);
InstallSuperVersion(default_cfd_, deletion_state);
Version::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number), c->level() + 1,
@ -2334,11 +2259,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
// Prioritize immutable compaction work
// TODO: remove memtable flush from normal compaction work
if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) {
if (default_cfd_->imm.imm_flush_needed.NoBarrier_Load() != nullptr) {
const uint64_t imm_start = env_->NowMicros();
LogFlush(options_.info_log);
mutex_.Lock();
if (imm_.IsFlushPending()) {
if (default_cfd_->imm.IsFlushPending()) {
FlushMemTableToOutputFile(nullptr, deletion_state);
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
@ -2649,7 +2574,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
if (status.ok()) {
status = InstallCompactionResults(compact);
InstallSuperVersion(deletion_state);
InstallSuperVersion(default_cfd_, deletion_state);
}
Version::LevelSummaryStorage tmp;
Log(options_.info_log,
@ -2716,10 +2641,9 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
// Collect together all needed child iterators for mem
mutex_.Lock();
*latest_snapshot = versions_->LastSequence();
mem_->Ref();
mutable_mem = mem_;
// Collect together all needed child iterators for imm_
immutable_mems = imm_.current();
mutable_mem = default_cfd_->mem;
mutable_mem->Ref();
immutable_mems = default_cfd_->imm.current();
immutable_mems->Ref();
versions_->current()->Ref();
version = versions_->current();
@ -2758,9 +2682,9 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
// get all child iterators and bump their refcounts under lock
mutex_.Lock();
mutable_mem = mem_;
mutable_mem = default_cfd_->mem;
mutable_mem->Ref();
immutable_mems = imm_.current();
immutable_mems = default_cfd_->imm.current();
immutable_mems->Ref();
version = versions_->current();
version->Ref();
@ -2823,12 +2747,13 @@ Status DBImpl::Get(const ReadOptions& options,
// first call already used it. In that rare case, we take a hit and create a
// new SuperVersion() inside of the mutex. We do similar thing
// for superversion_to_free
void DBImpl::InstallSuperVersion(DeletionState& deletion_state) {
void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd,
DeletionState& deletion_state) {
// if new_superversion == nullptr, it means somebody already used it
SuperVersion* new_superversion =
(deletion_state.new_superversion != nullptr) ?
deletion_state.new_superversion : new SuperVersion();
SuperVersion* old_superversion = InstallSuperVersion(new_superversion);
SuperVersion* old_superversion = InstallSuperVersion(cfd, new_superversion);
deletion_state.new_superversion = nullptr;
if (deletion_state.superversion_to_free != nullptr) {
// somebody already put it there
@ -2838,13 +2763,16 @@ void DBImpl::InstallSuperVersion(DeletionState& deletion_state) {
}
}
DBImpl::SuperVersion* DBImpl::InstallSuperVersion(
SuperVersion* new_superversion) {
SuperVersion* DBImpl::InstallSuperVersion(ColumnFamilyData* cfd,
SuperVersion* new_superversion) {
mutex_.AssertHeld();
new_superversion->Init(mem_, imm_.current(), versions_->current());
SuperVersion* old_superversion = super_version_;
super_version_ = new_superversion;
++super_version_number_;
new_superversion->Init(cfd->mem, cfd->imm.current(), cfd->current);
SuperVersion* old_superversion = cfd->super_version;
cfd->super_version = new_superversion;
if (cfd->id == 0) {
// TODO this is only for default column family for now
++super_version_number_;
}
if (old_superversion != nullptr && old_superversion->Unref()) {
old_superversion->Cleanup();
return old_superversion; // will let caller delete outside of mutex
@ -2868,7 +2796,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// This can be replaced by using atomics and spinlock instead of big mutex
mutex_.Lock();
SuperVersion* get_version = super_version_->Ref();
SuperVersion* get_version = default_cfd_->super_version->Ref();
mutex_.Unlock();
bool have_stat_update = false;
@ -2939,9 +2867,10 @@ std::vector<Status> DBImpl::MultiGet(
snapshot = versions_->LastSequence();
}
MemTable* mem = mem_;
MemTableListVersion* imm = imm_.current();
Version* current = versions_->current();
// TODO only works for default column family
MemTable* mem = default_cfd_->mem;
MemTableListVersion* imm = default_cfd_->imm.current();
Version* current = default_cfd_->current;
mem->Ref();
imm->Ref();
current->Ref();
@ -3012,12 +2941,12 @@ std::vector<Status> DBImpl::MultiGet(
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
const std::string& column_family_name,
ColumnFamilyHandle* handle) {
MutexLock l(&mutex_);
if (versions_->GetColumnFamilySet()->Exists(column_family_name)) {
return Status::InvalidArgument("Column family already exists");
}
VersionEdit edit;
edit.AddColumnFamily(column_family_name);
MutexLock l(&mutex_);
handle->id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
edit.SetColumnFamily(handle->id);
Status s = versions_->LogAndApply(&edit, &mutex_);
@ -3170,7 +3099,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
// into default_cfd_->mem.
{
mutex_.Unlock();
WriteBatch* updates = nullptr;
@ -3216,7 +3145,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_, &options_, this,
status = WriteBatchInternal::InsertInto(updates, default_cfd_->mem,
&options_, this,
options_.filter_deletes);
if (!status.ok()) {
// Panic for in-memory corruptions
@ -3382,14 +3312,15 @@ Status DBImpl::MakeRoomForWrite(bool force,
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
delayed_writes_++;
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
} else if (!force && (default_cfd_->mem->ApproximateMemoryUsage() <=
options_.write_buffer_size)) {
// There is room in current memtable
if (allow_delay) {
DelayLoggingAndReset();
}
break;
} else if (imm_.size() == options_.max_write_buffer_number - 1) {
} else if (default_cfd_->imm.size() ==
options_.max_write_buffer_number - 1) {
// We have filled up the current memtable, but the previous
// ones are still being compacted, so we wait.
DelayLoggingAndReset();
@ -3498,20 +3429,21 @@ Status DBImpl::MakeRoomForWrite(bool force,
}
logfile_number_ = new_log_number;
log_.reset(new log::Writer(std::move(lfile)));
mem_->SetNextLogNumber(logfile_number_);
imm_.Add(mem_);
default_cfd_->mem->SetNextLogNumber(logfile_number_);
default_cfd_->imm.Add(default_cfd_->mem);
if (force) {
imm_.FlushRequested();
default_cfd_->imm.FlushRequested();
}
mem_ = memtmp;
mem_->Ref();
default_cfd_->mem = memtmp;
default_cfd_->mem->Ref();
Log(options_.info_log,
"New memtable created with log file: #%lu\n",
(unsigned long)logfile_number_);
mem_->SetLogNumber(logfile_number_);
default_cfd_->mem->SetLogNumber(logfile_number_);
force = false; // Do not force another compaction if have room
MaybeScheduleFlushOrCompaction();
*superversion_to_free = InstallSuperVersion(new_superversion);
*superversion_to_free =
InstallSuperVersion(default_cfd_, new_superversion);
}
}
return s;
@ -3802,7 +3734,7 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
*value = versions_->current()->DebugString();
return true;
} else if (in == "num-immutable-mem-table") {
*value = std::to_string(imm_.size());
*value = std::to_string(default_cfd_->imm.size());
return true;
}
@ -3900,7 +3832,7 @@ Status DBImpl::DeleteFile(std::string name) {
edit.DeleteFile(level, number);
status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get());
if (status.ok()) {
InstallSuperVersion(deletion_state);
InstallSuperVersion(default_cfd_, deletion_state);
}
FindObsoleteFiles(deletion_state, false);
} // lock released here
@ -4028,7 +3960,8 @@ Status DB::OpenWithColumnFamilies(
return s;
}
impl->mutex_.Lock();
s = impl->Recover(column_families); // Handles create_if_missing, error_if_exists
// Handles create_if_missing, error_if_exists
s = impl->Recover(column_families);
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
unique_ptr<WritableFile> lfile;
@ -4061,8 +3994,10 @@ Status DB::OpenWithColumnFamilies(
}
}
if (s.ok()) {
delete impl->InstallSuperVersion(new DBImpl::SuperVersion());
impl->mem_->SetLogNumber(impl->logfile_number_);
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
delete impl->InstallSuperVersion(cfd, new SuperVersion());
cfd->mem->SetLogNumber(impl->logfile_number_);
}
impl->DeleteObsoleteFiles();
impl->MaybeScheduleFlushOrCompaction();
impl->MaybeScheduleLogDBDeployStats();

@ -15,6 +15,7 @@
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
#include "db/column_family.h"
#include "db/version_edit.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
@ -174,34 +175,6 @@ class DBImpl : public DB {
default_interval_to_delete_obsolete_WAL_ = default_interval_to_delete_obsolete_WAL;
}
// holds references to memtable, all immutable memtables and version
struct SuperVersion {
MemTable* mem;
MemTableListVersion* imm;
Version* current;
std::atomic<uint32_t> refs;
// We need to_delete because during Cleanup(), imm->Unref() returns
// all memtables that we need to free through this vector. We then
// delete all those memtables outside of mutex, during destruction
std::vector<MemTable*> to_delete;
// should be called outside the mutex
explicit SuperVersion(const int num_memtables = 0);
~SuperVersion();
SuperVersion* Ref();
// Returns true if this was the last reference and caller should
// call Clenaup() and delete the object
bool Unref();
// call these two methods with db mutex held
// Cleanup unrefs mem, imm and current. Also, it stores all memtables
// that needs to be deleted in to_delete vector. Unrefing those
// objects needs to be done in the mutex
void Cleanup();
void Init(MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current);
};
// needed for CleanupIteratorState
struct DeletionState {
inline bool HaveSomethingToDelete() const {
@ -286,7 +259,8 @@ class DBImpl : public DB {
}
MemTable* GetMemTable() {
return mem_;
// TODO currently only works for default column family
return default_cfd_->mem;
}
Iterator* NewInternalIterator(const ReadOptions&,
@ -425,13 +399,9 @@ class DBImpl : public DB {
port::Mutex mutex_;
port::AtomicPointer shutting_down_;
port::CondVar bg_cv_; // Signalled when background work finishes
MemTableRepFactory* mem_rep_factory_;
MemTable* mem_;
MemTableList imm_; // Memtable that are not changing
uint64_t logfile_number_;
unique_ptr<log::Writer> log_;
SuperVersion* super_version_;
ColumnFamilyData* default_cfd_;
// An ordinal representing the current SuperVersion. Updated by
// InstallSuperVersion(), i.e. incremented every time super_version_
@ -625,11 +595,13 @@ class DBImpl : public DB {
// Foreground threads call this function directly (they don't carry
// deletion state and have to handle their own creation and deletion
// of SuperVersion)
SuperVersion* InstallSuperVersion(SuperVersion* new_superversion);
SuperVersion* InstallSuperVersion(ColumnFamilyData* cfd,
SuperVersion* new_superversion);
// Background threads call this function, which is just a wrapper around
// the InstallSuperVersion() function above. Background threads carry
// deletion_state which can have new_superversion already allocated.
void InstallSuperVersion(DeletionState& deletion_state);
void InstallSuperVersion(ColumnFamilyData* cfd,
DeletionState& deletion_state);
// Function that Get and KeyMayExist call with no_io true or false
// Note: 'value_found' from KeyMayExist propagates here

@ -90,7 +90,8 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(default_column_family_name, cf_options));
Status s = impl->Recover(column_families, true /* read only */, error_if_log_file_exist);
Status s = impl->Recover(column_families, true /* read only */,
error_if_log_file_exist);
impl->mutex_.Unlock();
if (s.ok()) {
*dbptr = impl;

@ -33,7 +33,8 @@ struct hash<rocksdb::Slice> {
namespace rocksdb {
MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options)
MemTable::MemTable(const InternalKeyComparator& cmp,
const ColumnFamilyOptions& options)
: comparator_(cmp),
refs_(0),
arena_impl_(options.arena_block_size),

@ -13,7 +13,7 @@
#include <deque>
#include "db/dbformat.h"
#include "db/skiplist.h"
#include "db/version_set.h"
#include "db/version_edit.h"
#include "rocksdb/db.h"
#include "rocksdb/memtablerep.h"
#include "util/arena_impl.h"
@ -35,7 +35,7 @@ class MemTable {
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator,
const Options& options = Options());
const ColumnFamilyOptions& options);
~MemTable();

@ -8,6 +8,7 @@
#include <string>
#include "rocksdb/db.h"
#include "db/memtable.h"
#include "db/version_set.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "util/coding.h"

@ -13,7 +13,7 @@
#include "rocksdb/iterator.h"
#include "db/dbformat.h"
#include "db/skiplist.h"
#include "memtable.h"
#include "db/memtable.h"
namespace rocksdb {

@ -1392,6 +1392,9 @@ VersionSet::~VersionSet() {
for (auto cfd : *column_family_set_) {
cfd->current->Unref();
}
// we need to delete column_family_set_ because its destructor depends on
// VersionSet
column_family_set_.reset();
for (auto file : obsolete_files_) {
delete file;
}
@ -2401,6 +2404,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily(
edit->column_family_name_, edit->column_family_, dummy_versions, options);
AppendVersion(new_cfd, new Version(this, current_version_number_++));
new_cfd->CreateNewMemtable();
return new_cfd;
}

@ -446,6 +446,8 @@ class VersionSet {
friend class Compaction;
friend class Version;
// TODO temporarily until we have what ColumnFamilyData needs (icmp_)
friend struct ColumnFamilyData;
struct LogReporter : public log::Reader::Reporter {
Status* status;

@ -24,7 +24,7 @@ static std::string PrintContents(WriteBatch* b) {
auto factory = std::make_shared<SkipListFactory>();
Options options;
options.memtable_factory = factory;
MemTable* mem = new MemTable(cmp, options);
MemTable* mem = new MemTable(cmp, ColumnFamilyOptions(options));
mem->Ref();
std::string state;
Status s = WriteBatchInternal::InsertInto(b, mem, &options);

@ -315,7 +315,8 @@ class DB {
int target_level = -1) = 0;
Status CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false, int target_level = -1) {
return CompactRange(default_column_family, begin, end, reduce_level, target_level);
return CompactRange(default_column_family, begin, end, reduce_level,
target_level);
}
// Number of levels used for this DB.

@ -357,6 +357,21 @@ struct ColumnFamilyOptions {
// order.
int table_cache_remove_scan_count_limit;
// size of one block in arena memory allocation.
// If <= 0, a proper value is automatically calculated (usually 1/10 of
// writer_buffer_size).
//
// There are two additonal restriction of the The specified size:
// (1) size should be in the range of [4096, 2 << 30] and
// (2) be the multiple of the CPU word (which helps with the memory
// alignment).
//
// We'll automatically check and adjust the size number to make sure it
// conforms to the restrictions.
//
// Default: 0
size_t arena_block_size;
// Disable automatic compactions. Manual compactions can still
// be issued on this column family
bool disable_auto_compactions;
@ -562,21 +577,6 @@ struct DBOptions {
// The default value is MAX_INT so that roll-over does not take place.
uint64_t max_manifest_file_size;
// size of one block in arena memory allocation.
// If <= 0, a proper value is automatically calculated (usually 1/10 of
// writer_buffer_size).
//
// There are two additonal restriction of the The specified size:
// (1) size should be in the range of [4096, 2 << 30] and
// (2) be the multiple of the CPU word (which helps with the memory
// alignment).
//
// We'll automatically check and adjust the size number to make sure it
// conforms to the restrictions.
//
// Default: 0
size_t arena_block_size;
// The following two fields affect how archived logs will be deleted.
// 1. If both set to 0, logs will be deleted asap and will not get into
// the archive.

@ -371,7 +371,8 @@ class MemTableConstructor: public Constructor {
table_factory_(new SkipListFactory) {
Options options;
options.memtable_factory = table_factory_;
memtable_ = new MemTable(internal_comparator_, options);
memtable_ =
new MemTable(internal_comparator_, ColumnFamilyOptions(options));
memtable_->Ref();
}
~MemTableConstructor() {

@ -63,6 +63,7 @@ ColumnFamilyOptions::ColumnFamilyOptions()
no_block_cache(false),
table_cache_numshardbits(4),
table_cache_remove_scan_count_limit(16),
arena_block_size(0),
disable_auto_compactions(false),
purge_redundant_kvs_while_flush(true),
block_size_deviation(10),
@ -121,6 +122,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
table_cache_numshardbits(options.table_cache_numshardbits),
table_cache_remove_scan_count_limit(
options.table_cache_remove_scan_count_limit),
arena_block_size(options.arena_block_size),
disable_auto_compactions(options.disable_auto_compactions),
purge_redundant_kvs_while_flush(options.purge_redundant_kvs_while_flush),
block_size_deviation(options.block_size_deviation),
@ -157,7 +159,6 @@ DBOptions::DBOptions()
log_file_time_to_roll(0),
keep_log_file_num(1000),
max_manifest_file_size(std::numeric_limits<uint64_t>::max()),
arena_block_size(0),
WAL_ttl_seconds(0),
WAL_size_limit_MB(0),
manifest_preallocation_size(4 * 1024 * 1024),
@ -193,7 +194,6 @@ DBOptions::DBOptions(const Options& options)
log_file_time_to_roll(options.log_file_time_to_roll),
keep_log_file_num(options.keep_log_file_num),
max_manifest_file_size(options.max_manifest_file_size),
arena_block_size(options.arena_block_size),
WAL_ttl_seconds(options.WAL_ttl_seconds),
WAL_size_limit_MB(options.WAL_size_limit_MB),
manifest_preallocation_size(options.manifest_preallocation_size),

Loading…
Cancel
Save