You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
rocksdb/db/column_family.cc

786 lines
27 KiB

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/column_family.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <vector>
#include <string>
#include <algorithm>
#include <limits>
#include "db/db_impl.h"
#include "db/job_context.h"
#include "db/version_set.h"
#include "db/internal_stats.h"
#include "db/compaction_picker.h"
#include "db/table_properties_collector.h"
#include "db/write_controller.h"
#include "util/autovector.h"
#include "util/hash_skiplist_rep.h"
#include "util/options_helper.h"
namespace rocksdb {
namespace {
// This function computes the amount of time in microseconds by which a write
// should be delayed based on the number of level-0 files according to the
// following formula:
// if n < bottom, return 0;
// if n >= top, return 1000;
// otherwise, let r = (n - bottom) /
// (top - bottom)
// and return r^2 * 1000.
// The goal of this formula is to gradually increase the rate at which writes
// are slowed. We also tried linear delay (r * 1000), but it seemed to do
// slightly worse. There is no other particular reason for choosing quadratic.
uint64_t SlowdownAmount(int n, double bottom, double top) {
uint64_t delay;
if (n >= top) {
delay = 1000;
} else if (n < bottom) {
delay = 0;
} else {
// If we are here, we know that:
// level0_start_slowdown <= n < level0_slowdown
// since the previous two conditions are false.
double how_much = static_cast<double>(n - bottom) / (top - bottom);
delay = std::max(how_much * how_much * 1000, 100.0);
}
assert(delay <= 1000);
return delay;
}
} // namespace
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
ColumnFamilyData* column_family_data, DBImpl* db, port::Mutex* mutex)
: cfd_(column_family_data), db_(db), mutex_(mutex) {
if (cfd_ != nullptr) {
cfd_->Ref();
}
}
ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
if (cfd_ != nullptr) {
JobContext job_context;
mutex_->Lock();
if (cfd_->Unref()) {
delete cfd_;
}
db_->FindObsoleteFiles(&job_context, false, true);
mutex_->Unlock();
if (job_context.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(job_context);
}
}
}
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
const std::string& ColumnFamilyHandleImpl::GetName() const {
return cfd()->GetName();
}
const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
return cfd()->user_comparator();
}
ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
const ColumnFamilyOptions& src) {
ColumnFamilyOptions result = src;
result.comparator = icmp;
#ifdef OS_MACOSX
// TODO(icanadi) make write_buffer_size uint64_t instead of size_t
ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, ((size_t)1) << 30);
#else
ClipToRange(&result.write_buffer_size,
((size_t)64) << 10, ((size_t)64) << 30);
#endif
// if user sets arena_block_size, we trust user to use this value. Otherwise,
// calculate a proper value from writer_buffer_size;
if (result.arena_block_size <= 0) {
result.arena_block_size = result.write_buffer_size / 10;
}
result.min_write_buffer_number_to_merge =
std::min(result.min_write_buffer_number_to_merge,
result.max_write_buffer_number - 1);
result.compression_per_level = src.compression_per_level;
if (result.max_mem_compaction_level >= result.num_levels) {
result.max_mem_compaction_level = result.num_levels - 1;
}
if (result.soft_rate_limit > result.hard_rate_limit) {
result.soft_rate_limit = result.hard_rate_limit;
}
if (result.max_write_buffer_number < 2) {
result.max_write_buffer_number = 2;
}
if (!result.prefix_extractor) {
assert(result.memtable_factory);
Slice name = result.memtable_factory->Name();
if (name.compare("HashSkipListRepFactory") == 0 ||
name.compare("HashLinkListRepFactory") == 0) {
result.memtable_factory = std::make_shared<SkipListFactory>();
}
}
// -- Sanitize the table properties collector
// All user defined properties collectors will be wrapped by
// UserKeyTablePropertiesCollector since for them they only have the
// knowledge of the user keys; internal keys are invisible to them.
TablePropertiesCollectorFactory Summary: This diff addresses task #4296714 and rethinks how users provide us with TablePropertiesCollectors as part of Options. Here's description of task #4296714: I'm debugging #4295529 and noticed that our count of user properties kDeletedKeys is wrong. We're sharing one single InternalKeyPropertiesCollector with all Table Builders. In LOG Files, we're outputting number of kDeletedKeys as connected with a single table, while it's actually the total count of deleted keys since creation of the DB. For example, this table has 3155 entries and 1391828 deleted keys. The problem with current approach that we call methods on a single TablePropertiesCollector for all the tables we create. Even worse, we could do it from multiple threads at the same time and TablePropertiesCollector has no way of knowing which table we're calling it for. Good part: Looks like nobody inside Facebook is using Options::table_properties_collectors. This means we should be able to painfully change the API. In this change, I introduce TablePropertiesCollectorFactory. For every table we create, we call `CreateTablePropertiesCollector`, which creates a TablePropertiesCollector for a single table. We then use it sequentially from a single thread, which means it doesn't have to be thread-safe. Test Plan: Added a test in table_properties_collector_test that fails on master (build two tables, assert that kDeletedKeys count is correct for the second one). Also, all other tests Reviewers: sdong, dhruba, haobo, kailiu Reviewed By: kailiu CC: leveldb Differential Revision: https://reviews.facebook.net/D18579
11 years ago
auto& collector_factories = result.table_properties_collector_factories;
for (size_t i = 0; i < result.table_properties_collector_factories.size();
++i) {
assert(collector_factories[i]);
collector_factories[i] =
std::make_shared<UserKeyTablePropertiesCollectorFactory>(
collector_factories[i]);
}
// Add collector to collect internal key statistics
TablePropertiesCollectorFactory Summary: This diff addresses task #4296714 and rethinks how users provide us with TablePropertiesCollectors as part of Options. Here's description of task #4296714: I'm debugging #4295529 and noticed that our count of user properties kDeletedKeys is wrong. We're sharing one single InternalKeyPropertiesCollector with all Table Builders. In LOG Files, we're outputting number of kDeletedKeys as connected with a single table, while it's actually the total count of deleted keys since creation of the DB. For example, this table has 3155 entries and 1391828 deleted keys. The problem with current approach that we call methods on a single TablePropertiesCollector for all the tables we create. Even worse, we could do it from multiple threads at the same time and TablePropertiesCollector has no way of knowing which table we're calling it for. Good part: Looks like nobody inside Facebook is using Options::table_properties_collectors. This means we should be able to painfully change the API. In this change, I introduce TablePropertiesCollectorFactory. For every table we create, we call `CreateTablePropertiesCollector`, which creates a TablePropertiesCollector for a single table. We then use it sequentially from a single thread, which means it doesn't have to be thread-safe. Test Plan: Added a test in table_properties_collector_test that fails on master (build two tables, assert that kDeletedKeys count is correct for the second one). Also, all other tests Reviewers: sdong, dhruba, haobo, kailiu Reviewed By: kailiu CC: leveldb Differential Revision: https://reviews.facebook.net/D18579
11 years ago
collector_factories.push_back(
std::make_shared<InternalKeyPropertiesCollectorFactory>());
if (result.compaction_style == kCompactionStyleFIFO) {
result.num_levels = 1;
// since we delete level0 files in FIFO compaction when there are too many
// of them, these options don't really mean anything
result.level0_file_num_compaction_trigger = std::numeric_limits<int>::max();
result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
}
return result;
}
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
SuperVersion::~SuperVersion() {
for (auto td : to_delete) {
delete td;
}
}
SuperVersion* SuperVersion::Ref() {
refs.fetch_add(1, std::memory_order_relaxed);
return this;
}
bool SuperVersion::Unref() {
// fetch_sub returns the previous value of ref
uint32_t previous_refs = refs.fetch_sub(1, std::memory_order_relaxed);
assert(previous_refs > 0);
return previous_refs == 1;
}
void SuperVersion::Cleanup() {
assert(refs.load(std::memory_order_relaxed) == 0);
imm->Unref(&to_delete);
MemTable* m = mem->Unref();
if (m != nullptr) {
to_delete.push_back(m);
}
current->Unref();
}
void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current) {
mem = new_mem;
imm = new_imm;
current = new_current;
mem->Ref();
imm->Ref();
current->Ref();
refs.store(1, std::memory_order_relaxed);
}
namespace {
void SuperVersionUnrefHandle(void* ptr) {
// UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
// destroyed. When former happens, the thread shouldn't see kSVInUse.
// When latter happens, we are in ~ColumnFamilyData(), no get should happen as
// well.
SuperVersion* sv = static_cast<SuperVersion*>(ptr);
if (sv->Unref()) {
sv->db_mutex->Lock();
sv->Cleanup();
sv->db_mutex->Unlock();
delete sv;
}
}
} // anonymous namespace
ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
Version* _dummy_versions,
Cache* _table_cache,
const ColumnFamilyOptions& cf_options,
const DBOptions* db_options,
const EnvOptions& env_options,
ColumnFamilySet* column_family_set)
: id_(id),
name_(name),
dummy_versions_(_dummy_versions),
current_(nullptr),
refs_(0),
dropped_(false),
internal_comparator_(cf_options.comparator),
options_(*db_options, SanitizeOptions(&internal_comparator_, cf_options)),
ioptions_(options_),
mutable_cf_options_(options_, ioptions_),
mem_(nullptr),
imm_(options_.min_write_buffer_number_to_merge),
super_version_(nullptr),
super_version_number_(0),
local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
next_(nullptr),
prev_(nullptr),
log_number_(0),
column_family_set_(column_family_set) {
Ref();
// if _dummy_versions is nullptr, then this is a dummy column family.
if (_dummy_versions != nullptr) {
make internal stats independent of statistics Summary: also make it aware of column family output from db_bench ``` ** Compaction Stats [default] ** Level Files Size(MB) Score Read(GB) Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) RW-Amp W-Amp Rd(MB/s) Wr(MB/s) Rn(cnt) Rnp1(cnt) Wnp1(cnt) Wnew(cnt) Comp(sec) Comp(cnt) Avg(sec) Stall(sec) Stall(cnt) Avg(ms) ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ L0 14 956 0.9 0.0 0.0 0.0 2.7 2.7 0.0 0.0 0.0 111.6 0 0 0 0 24 40 0.612 75.20 492387 0.15 L1 21 2001 2.0 5.7 2.0 3.7 5.3 1.6 5.4 2.6 71.2 65.7 31 43 55 12 82 2 41.242 43.72 41183 1.06 L2 217 18974 1.9 16.5 2.0 14.4 15.1 0.7 15.6 7.4 70.1 64.3 17 182 185 3 241 16 15.052 0.00 0 0.00 L3 1641 188245 1.8 9.1 1.1 8.0 8.5 0.5 15.4 7.4 61.3 57.2 9 75 76 1 152 9 16.887 0.00 0 0.00 L4 4447 449025 0.4 13.4 4.8 8.6 9.1 0.5 4.7 1.9 77.8 52.7 38 79 100 21 176 38 4.639 0.00 0 0.00 Sum 6340 659201 0.0 44.7 10.0 34.7 40.6 6.0 32.0 15.2 67.7 61.6 95 379 416 37 676 105 6.439 118.91 533570 0.22 Int 0 0 0.0 1.2 0.4 0.8 1.3 0.5 5.2 2.7 59.1 65.6 3 7 9 2 20 10 2.003 0.00 0 0.00 Stalls(secs): 75.197 level0_slowdown, 0.000 level0_numfiles, 0.000 memtable_compaction, 43.717 leveln_slowdown Stalls(count): 492387 level0_slowdown, 0 level0_numfiles, 0 memtable_compaction, 41183 leveln_slowdown ** DB Stats ** Uptime(secs): 202.1 total, 13.5 interval Cumulative writes: 6291456 writes, 6291456 batches, 1.0 writes per batch, 4.90 ingest GB Cumulative WAL: 6291456 writes, 6291456 syncs, 1.00 writes per sync, 4.90 GB written Interval writes: 1048576 writes, 1048576 batches, 1.0 writes per batch, 836.0 ingest MB Interval WAL: 1048576 writes, 1048576 syncs, 1.00 writes per sync, 0.82 MB written Test Plan: ran it Reviewers: sdong, yhchiang, igor Reviewed By: igor Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19917
11 years ago
internal_stats_.reset(
new InternalStats(ioptions_.num_levels, db_options->env, this));
table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
if (ioptions_.compaction_style == kCompactionStyleUniversal) {
compaction_picker_.reset(
new UniversalCompactionPicker(ioptions_, &internal_comparator_));
} else if (ioptions_.compaction_style == kCompactionStyleLevel) {
compaction_picker_.reset(
new LevelCompactionPicker(ioptions_, &internal_comparator_));
} else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
compaction_picker_.reset(
new FIFOCompactionPicker(ioptions_, &internal_comparator_));
} else if (ioptions_.compaction_style == kCompactionStyleNone) {
compaction_picker_.reset(new NullCompactionPicker(
ioptions_, &internal_comparator_));
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"Column family %s does not use any background compaction. "
"Compactions can only be done via CompactFiles\n",
GetName().c_str());
} else {
Log(InfoLogLevel::ERROR_LEVEL, ioptions_.info_log,
"Unable to recognize the specified compaction style %d. "
"Column family %s will use kCompactionStyleLevel.\n",
ioptions_.compaction_style, GetName().c_str());
compaction_picker_.reset(
new LevelCompactionPicker(ioptions_, &internal_comparator_));
}
Log(InfoLogLevel::INFO_LEVEL,
ioptions_.info_log, "Options for column family \"%s\":\n",
name.c_str());
options_.Dump(ioptions_.info_log);
}
RecalculateWriteStallConditions(mutable_cf_options_);
}
// DB mutex held
ColumnFamilyData::~ColumnFamilyData() {
assert(refs_ == 0);
// remove from linked list
auto prev = prev_;
auto next = next_;
prev->next_ = next;
next->prev_ = prev;
// it's nullptr for dummy CFD
if (column_family_set_ != nullptr) {
// remove from column_family_set
column_family_set_->RemoveColumnFamily(this);
}
if (current_ != nullptr) {
current_->Unref();
}
if (super_version_ != nullptr) {
// Release SuperVersion reference kept in ThreadLocalPtr.
// This must be done outside of mutex_ since unref handler can lock mutex.
super_version_->db_mutex->Unlock();
local_sv_.reset();
super_version_->db_mutex->Lock();
bool is_last_reference __attribute__((unused));
is_last_reference = super_version_->Unref();
assert(is_last_reference);
super_version_->Cleanup();
delete super_version_;
super_version_ = nullptr;
}
if (dummy_versions_ != nullptr) {
// List must be empty
assert(dummy_versions_->TEST_Next() == dummy_versions_);
bool deleted __attribute__((unused)) = dummy_versions_->Unref();
assert(deleted);
}
if (mem_ != nullptr) {
delete mem_->Unref();
}
autovector<MemTable*> to_delete;
imm_.current()->Unref(&to_delete);
for (MemTable* m : to_delete) {
delete m;
}
}
void ColumnFamilyData::RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options) {
if (current_ != nullptr) {
auto* vstorage = current_->storage_info();
const double score = vstorage->max_compaction_score();
const int max_level = vstorage->max_compaction_score_level();
auto write_controller = column_family_set_->write_controller_;
if (imm()->size() >= mutable_cf_options.max_write_buffer_number) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stopping writes because we have %d immutable memtables "
"(waiting for flush), max_write_buffer_number is set to %d",
name_.c_str(), imm()->size(),
mutable_cf_options.max_write_buffer_number);
} else if (vstorage->NumLevelFiles(0) >=
mutable_cf_options.level0_stop_writes_trigger) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES, 1);
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stopping writes because we have %d level-0 files",
name_.c_str(), vstorage->NumLevelFiles(0));
} else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
vstorage->NumLevelFiles(0) >=
mutable_cf_options.level0_slowdown_writes_trigger) {
uint64_t slowdown =
SlowdownAmount(vstorage->NumLevelFiles(0),
mutable_cf_options.level0_slowdown_writes_trigger,
mutable_cf_options.level0_stop_writes_trigger);
write_controller_token_ = write_controller->GetDelayToken(slowdown);
internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, slowdown);
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stalling writes because we have %d level-0 files (%" PRIu64
"us)",
name_.c_str(), vstorage->NumLevelFiles(0), slowdown);
} else if (mutable_cf_options.hard_rate_limit > 1.0 &&
score > mutable_cf_options.hard_rate_limit) {
uint64_t kHardLimitSlowdown = 1000;
write_controller_token_ =
write_controller->GetDelayToken(kHardLimitSlowdown);
internal_stats_->RecordLevelNSlowdown(max_level, kHardLimitSlowdown,
false);
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stalling writes because we hit hard limit on level %d. "
"(%" PRIu64 "us)",
name_.c_str(), max_level, kHardLimitSlowdown);
} else if (mutable_cf_options.soft_rate_limit > 0.0 &&
score > mutable_cf_options.soft_rate_limit) {
uint64_t slowdown = SlowdownAmount(score,
mutable_cf_options.soft_rate_limit,
mutable_cf_options.hard_rate_limit);
write_controller_token_ = write_controller->GetDelayToken(slowdown);
internal_stats_->RecordLevelNSlowdown(max_level, slowdown, true);
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stalling writes because we hit soft limit on level %d (%" PRIu64
"us)",
name_.c_str(), max_level, slowdown);
} else {
write_controller_token_.reset();
}
}
}
const EnvOptions* ColumnFamilyData::soptions() const {
return &(column_family_set_->env_options_);
}
void ColumnFamilyData::SetCurrent(Version* current_version) {
current_ = current_version;
}
void ColumnFamilyData::CreateNewMemtable(
const MutableCFOptions& mutable_cf_options) {
assert(current_ != nullptr);
if (mem_ != nullptr) {
delete mem_->Unref();
}
mem_ = new MemTable(internal_comparator_, ioptions_, mutable_cf_options);
mem_->Ref();
}
Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
auto* result = compaction_picker_->PickCompaction(
GetName(), mutable_options, current_->storage_info(), log_buffer);
if (result != nullptr) {
result->SetInputVersion(current_);
}
return result;
}
Compaction* ColumnFamilyData::CompactRange(
const MutableCFOptions& mutable_cf_options,
int input_level, int output_level, uint32_t output_path_id,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end) {
auto* result = compaction_picker_->CompactRange(
GetName(), mutable_cf_options, current_->storage_info(), input_level,
output_level, output_path_id, begin, end, compaction_end);
if (result != nullptr) {
result->SetInputVersion(current_);
}
return result;
}
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
port::Mutex* db_mutex) {
SuperVersion* sv = nullptr;
sv = GetThreadLocalSuperVersion(db_mutex);
sv->Ref();
if (!ReturnThreadLocalSuperVersion(sv)) {
sv->Unref();
}
return sv;
}
SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
port::Mutex* db_mutex) {
SuperVersion* sv = nullptr;
// The SuperVersion is cached in thread local storage to avoid acquiring
// mutex when SuperVersion does not change since the last use. When a new
// SuperVersion is installed, the compaction or flush thread cleans up
// cached SuperVersion in all existing thread local storage. To avoid
// acquiring mutex for this operation, we use atomic Swap() on the thread
// local pointer to guarantee exclusive access. If the thread local pointer
// is being used while a new SuperVersion is installed, the cached
// SuperVersion can become stale. In that case, the background thread would
// have swapped in kSVObsolete. We re-check the value at when returning
// SuperVersion back to thread local, with an atomic compare and swap.
// The superversion will need to be released if detected to be stale.
void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
// Invariant:
// (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
// (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
// should only keep kSVInUse before ReturnThreadLocalSuperVersion call
// (if no Scrape happens).
assert(ptr != SuperVersion::kSVInUse);
sv = static_cast<SuperVersion*>(ptr);
if (sv == SuperVersion::kSVObsolete ||
sv->version_number != super_version_number_.load()) {
RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
SuperVersion* sv_to_delete = nullptr;
if (sv && sv->Unref()) {
RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
db_mutex->Lock();
// NOTE: underlying resources held by superversion (sst files) might
// not be released until the next background job.
sv->Cleanup();
sv_to_delete = sv;
} else {
db_mutex->Lock();
}
sv = super_version_->Ref();
db_mutex->Unlock();
delete sv_to_delete;
}
assert(sv != nullptr);
return sv;
}
bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
assert(sv != nullptr);
// Put the SuperVersion back
void* expected = SuperVersion::kSVInUse;
if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
// When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
// storage has not been altered and no Scrape has happend. The
// SuperVersion is still current.
return true;
} else {
// ThreadLocal scrape happened in the process of this GetImpl call (after
// thread local Swap() at the beginning and before CompareAndSwap()).
// This means the SuperVersion it holds is obsolete.
assert(expected == SuperVersion::kSVObsolete);
}
return false;
}
void ColumnFamilyData::NotifyOnFlushCompleted(
DB* db, const std::string& file_path,
bool triggered_flush_slowdown,
bool triggered_flush_stop) {
auto listeners = ioptions()->listeners;
for (auto listener : listeners) {
listener->OnFlushCompleted(
db, GetName(), file_path,
// Use path 0 as fulled memtables are first flushed into path 0.
triggered_flush_slowdown, triggered_flush_stop);
}
}
SuperVersion* ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion, port::Mutex* db_mutex) {
db_mutex->AssertHeld();
return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_);
}
SuperVersion* ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion, port::Mutex* db_mutex,
const MutableCFOptions& mutable_cf_options) {
new_superversion->db_mutex = db_mutex;
new_superversion->mutable_cf_options = mutable_cf_options;
new_superversion->Init(mem_, imm_.current(), current_);
SuperVersion* old_superversion = super_version_;
super_version_ = new_superversion;
++super_version_number_;
super_version_->version_number = super_version_number_;
// Reset SuperVersions cached in thread local storage
ResetThreadLocalSuperVersions();
RecalculateWriteStallConditions(mutable_cf_options);
if (old_superversion != nullptr && old_superversion->Unref()) {
old_superversion->Cleanup();
return old_superversion; // will let caller delete outside of mutex
}
return nullptr;
}
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
autovector<void*> sv_ptrs;
local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
for (auto ptr : sv_ptrs) {
assert(ptr);
if (ptr == SuperVersion::kSVInUse) {
continue;
}
auto sv = static_cast<SuperVersion*>(ptr);
if (sv->Unref()) {
sv->Cleanup();
delete sv;
}
}
}
Status ColumnFamilyData::SetOptions(
const std::unordered_map<std::string, std::string>& options_map) {
MutableCFOptions new_mutable_cf_options;
Status s = GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
&new_mutable_cf_options);
if (s.ok()) {
mutable_cf_options_ = new_mutable_cf_options;
mutable_cf_options_.RefreshDerivedOptions(ioptions_);
}
return s;
}
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const DBOptions* db_options,
const EnvOptions& env_options,
Cache* table_cache,
WriteController* write_controller)
: max_column_family_(0),
dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr,
ColumnFamilyOptions(), db_options,
env_options, nullptr)),
default_cfd_cache_(nullptr),
db_name_(dbname),
db_options_(db_options),
env_options_(env_options),
table_cache_(table_cache),
write_controller_(write_controller),
spin_lock_(ATOMIC_FLAG_INIT) {
// initialize linked list
dummy_cfd_->prev_ = dummy_cfd_;
dummy_cfd_->next_ = dummy_cfd_;
}
ColumnFamilySet::~ColumnFamilySet() {
while (column_family_data_.size() > 0) {
// cfd destructor will delete itself from column_family_data_
auto cfd = column_family_data_.begin()->second;
cfd->Unref();
delete cfd;
}
dummy_cfd_->Unref();
delete dummy_cfd_;
}
ColumnFamilyData* ColumnFamilySet::GetDefault() const {
assert(default_cfd_cache_ != nullptr);
return default_cfd_cache_;
}
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
auto cfd_iter = column_family_data_.find(id);
if (cfd_iter != column_family_data_.end()) {
return cfd_iter->second;
} else {
return nullptr;
}
}
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
const {
auto cfd_iter = column_families_.find(name);
if (cfd_iter != column_families_.end()) {
auto cfd = GetColumnFamily(cfd_iter->second);
assert(cfd != nullptr);
return cfd;
} else {
return nullptr;
}
}
uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
return ++max_column_family_;
}
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }
void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
max_column_family_ = std::max(new_max_column_family, max_column_family_);
}
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
return column_families_.size();
}
// under a DB mutex
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
const std::string& name, uint32_t id, Version* dummy_versions,
const ColumnFamilyOptions& options) {
assert(column_families_.find(name) == column_families_.end());
ColumnFamilyData* new_cfd =
new ColumnFamilyData(id, name, dummy_versions, table_cache_, options,
db_options_, env_options_, this);
Lock();
column_families_.insert({name, id});
column_family_data_.insert({id, new_cfd});
Unlock();
max_column_family_ = std::max(max_column_family_, id);
// add to linked list
new_cfd->next_ = dummy_cfd_;
auto prev = dummy_cfd_->prev_;
new_cfd->prev_ = prev;
prev->next_ = new_cfd;
dummy_cfd_->prev_ = new_cfd;
if (id == 0) {
default_cfd_cache_ = new_cfd;
}
return new_cfd;
}
void ColumnFamilySet::Lock() {
// spin lock
while (spin_lock_.test_and_set(std::memory_order_acquire)) {
}
}
void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); }
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
autovector<ColumnFamilyData*> to_delete;
for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
if (cfd->refs_ == 0) {
to_delete.push_back(cfd);
}
}
for (auto cfd : to_delete) {
// this is very rare, so it's not a problem that we do it under a mutex
delete cfd;
}
}
// under a DB mutex
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
auto cfd_iter = column_family_data_.find(cfd->GetID());
assert(cfd_iter != column_family_data_.end());
Lock();
column_family_data_.erase(cfd_iter);
column_families_.erase(cfd->GetName());
Unlock();
}
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
if (column_family_id == 0) {
// optimization for common case
current_ = column_family_set_->GetDefault();
} else {
// maybe outside of db mutex, should lock
column_family_set_->Lock();
current_ = column_family_set_->GetColumnFamily(column_family_id);
column_family_set_->Unlock();
// TODO(icanadi) Maybe remove column family from the hash table when it's
// dropped?
if (current_ != nullptr && current_->IsDropped()) {
current_ = nullptr;
}
}
handle_.SetCFD(current_);
return current_ != nullptr;
}
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
assert(current_ != nullptr);
return current_->GetLogNumber();
}
MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
assert(current_ != nullptr);
return current_->mem();
}
const Options* ColumnFamilyMemTablesImpl::GetOptions() const {
assert(current_ != nullptr);
return current_->options();
}
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
assert(current_ != nullptr);
return &handle_;
}
void ColumnFamilyMemTablesImpl::CheckMemtableFull() {
if (current_ != nullptr && current_->mem()->ShouldScheduleFlush()) {
flush_scheduler_->ScheduleFlush(current_);
current_->mem()->MarkFlushScheduled();
}
}
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
uint32_t column_family_id = 0;
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
column_family_id = cfh->GetID();
}
return column_family_id;
}
const Comparator* GetColumnFamilyUserComparator(
ColumnFamilyHandle* column_family) {
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
return cfh->user_comparator();
}
return nullptr;
}
} // namespace rocksdb