[CF] Rethinking ColumnFamilyHandle and fix to dropping column families

Summary:
The change to the public behavior:
* When opening a DB or creating new column family client gets a ColumnFamilyHandle.
* As long as column family handle is alive, client can do whatever he wants with it, even drop it
* Dropped column family can still be read from (using the column family handle)
* Added a new call CloseColumnFamily(). Client has to close all column families that he has opened before deleting the DB
* As soon as column family is closed, any calls to DB using that column family handle will fail (also any outstanding calls)

Internally:
* Ref-counting ColumnFamilyData
* New thread-safety for ColumnFamilySet
* Dropped column families are now completely dropped and their memory cleaned-up

Test Plan: added some tests to column_family_test

Reviewers: dhruba, haobo, kailiu, sdong

CC: leveldb

Differential Revision: https://reviews.facebook.net/D16101
main
Igor Canadi 11 years ago
parent 8d4db63a2d
commit b06840aa7d
  1. 111
      db/column_family.cc
  2. 97
      db/column_family.h
  3. 64
      db/column_family_test.cc
  4. 12
      db/compaction.cc
  5. 2
      db/db_filesnapshot.cc
  6. 353
      db/db_impl.cc
  7. 53
      db/db_impl.h
  8. 14
      db/db_impl_readonly.cc
  9. 19
      db/db_impl_readonly.h
  10. 2
      db/db_stats_logger.cc
  11. 54
      db/db_test.cc
  12. 20
      db/version_set.cc
  13. 2
      db/version_set.h
  14. 22
      db/write_batch.cc
  15. 7
      db/write_batch_internal.h
  16. 31
      include/rocksdb/column_family.h
  17. 100
      include/rocksdb/db.h
  18. 1
      include/rocksdb/write_batch.h
  19. 43
      include/utilities/stackable_db.h
  20. 3
      table/filter_block.h
  21. 1
      util/ldb_cmd.cc
  22. 8
      utilities/backupable/backupable_db_test.cc
  23. 17
      utilities/ttl/db_ttl.cc
  24. 17
      utilities/ttl/db_ttl.h

@ -13,6 +13,7 @@
#include <string>
#include <algorithm>
#include "db/db_impl.h"
#include "db/version_set.h"
#include "db/internal_stats.h"
#include "db/compaction_picker.h"
@ -22,6 +23,27 @@
namespace rocksdb {
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(ColumnFamilyData* cfd,
DBImpl* db, port::Mutex* mutex)
: cfd_(cfd), db_(db), mutex_(mutex) {
if (cfd_ != nullptr) {
cfd_->Ref();
}
}
ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
if (cfd_ != nullptr) {
DBImpl::DeletionState deletion_state;
mutex_->Lock();
if (cfd_->Unref()) {
delete cfd_;
}
db_->FindObsoleteFiles(deletion_state, false, true);
mutex_->Unlock();
db_->PurgeObsoleteFiles(deletion_state);
}
}
namespace {
// Fix user-supplied options to be reasonable
template <class T, class V>
@ -134,11 +156,14 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
Version* dummy_versions, Cache* table_cache,
const ColumnFamilyOptions& options,
const DBOptions* db_options,
const EnvOptions& storage_options)
const EnvOptions& storage_options,
ColumnFamilySet* column_family_set)
: id_(id),
name_(name),
dummy_versions_(dummy_versions),
current_(nullptr),
refs_(0),
dropped_(false),
internal_comparator_(options.comparator),
internal_filter_policy_(options.filter_policy),
options_(SanitizeOptions(&internal_comparator_, &internal_filter_policy_,
@ -151,7 +176,10 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
next_(nullptr),
prev_(nullptr),
log_number_(0),
need_slowdown_for_num_level0_files_(false) {
need_slowdown_for_num_level0_files_(false),
column_family_set_(column_family_set) {
Ref();
// if dummy_versions is nullptr, then this is a dummy column family.
if (dummy_versions != nullptr) {
internal_stats_.reset(new InternalStats(options.num_levels, db_options->env,
@ -172,7 +200,15 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
}
}
// DB mutex held
ColumnFamilyData::~ColumnFamilyData() {
assert(refs_ == 0);
// remove from linked list
auto prev = prev_;
auto next = next_;
prev->next_ = next;
next->prev_ = prev;
if (super_version_ != nullptr) {
bool is_last_reference __attribute__((unused));
is_last_reference = super_version_->Unref();
@ -180,6 +216,17 @@ ColumnFamilyData::~ColumnFamilyData() {
super_version_->Cleanup();
delete super_version_;
}
// it's nullptr for dummy CFD
if (column_family_set_ != nullptr) {
// remove from column_family_set
column_family_set_->DropColumnFamily(this);
}
if (current_ != nullptr) {
current_->Unref();
}
if (dummy_versions_ != nullptr) {
// List must be empty
assert(dummy_versions_->next_ == dummy_versions_);
@ -248,24 +295,25 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
: max_column_family_(0),
dummy_cfd_(new ColumnFamilyData(dbname, 0, "", nullptr, nullptr,
ColumnFamilyOptions(), db_options,
storage_options_)),
storage_options_, nullptr)),
db_name_(dbname),
db_options_(db_options),
storage_options_(storage_options),
table_cache_(table_cache),
spin_lock_(ATOMIC_FLAG_INIT) {
// initialize linked list
dummy_cfd_->prev_.store(dummy_cfd_);
dummy_cfd_->next_.store(dummy_cfd_);
dummy_cfd_->prev_ = dummy_cfd_;
dummy_cfd_->next_ = dummy_cfd_;
}
ColumnFamilySet::~ColumnFamilySet() {
for (auto& cfd : column_family_data_) {
delete cfd.second;
}
for (auto& cfd : droppped_column_families_) {
while (column_family_data_.size() > 0) {
// cfd destructor will delete itself from column_family_data_
auto cfd = column_family_data_.begin()->second;
cfd->Unref();
delete cfd;
}
dummy_cfd_->Unref();
delete dummy_cfd_;
}
@ -303,39 +351,36 @@ uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
return ++max_column_family_;
}
// under a DB mutex
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
const std::string& name, uint32_t id, Version* dummy_versions,
const ColumnFamilyOptions& options) {
assert(column_families_.find(name) == column_families_.end());
column_families_.insert({name, id});
ColumnFamilyData* new_cfd =
new ColumnFamilyData(db_name_, id, name, dummy_versions, table_cache_,
options, db_options_, storage_options_);
options, db_options_, storage_options_, this);
Lock();
column_families_.insert({name, id});
column_family_data_.insert({id, new_cfd});
Unlock();
max_column_family_ = std::max(max_column_family_, id);
// add to linked list
new_cfd->next_.store(dummy_cfd_);
auto prev = dummy_cfd_->prev_.load();
new_cfd->prev_.store(prev);
prev->next_.store(new_cfd);
dummy_cfd_->prev_.store(new_cfd);
new_cfd->next_ = dummy_cfd_;
auto prev = dummy_cfd_->prev_;
new_cfd->prev_ = prev;
prev->next_ = new_cfd;
dummy_cfd_->prev_ = new_cfd;
return new_cfd;
}
void ColumnFamilySet::DropColumnFamily(uint32_t id) {
assert(id != 0);
auto cfd_iter = column_family_data_.find(id);
// under a DB mutex
void ColumnFamilySet::DropColumnFamily(ColumnFamilyData* cfd) {
auto cfd_iter = column_family_data_.find(cfd->GetID());
assert(cfd_iter != column_family_data_.end());
auto cfd = cfd_iter->second;
column_families_.erase(cfd->GetName());
cfd->current()->Unref();
droppped_column_families_.push_back(cfd);
Lock();
column_family_data_.erase(cfd_iter);
// remove from linked list
auto prev = cfd->prev_.load();
auto next = cfd->next_.load();
prev->next_.store(next);
next->prev_.store(prev);
column_families_.erase(cfd->GetName());
Unlock();
}
void ColumnFamilySet::Lock() {
@ -347,8 +392,11 @@ void ColumnFamilySet::Lock() {
void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); }
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
// maybe outside of db mutex, should lock
column_family_set_->Lock();
current_ = column_family_set_->GetColumnFamily(column_family_id);
handle_.id = column_family_id;
column_family_set_->Unlock();
handle_.SetCFD(current_);
return current_ != nullptr;
}
@ -367,10 +415,9 @@ const Options* ColumnFamilyMemTablesImpl::GetFullOptions() const {
return current_->full_options();
}
const ColumnFamilyHandle& ColumnFamilyMemTablesImpl::GetColumnFamilyHandle()
const {
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
assert(current_ != nullptr);
return handle_;
return &handle_;
}
} // namespace rocksdb

@ -30,6 +30,35 @@ class CompactionPicker;
class Compaction;
class InternalKey;
class InternalStats;
class ColumnFamilyData;
class DBImpl;
class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
public:
// create while holding the mutex
ColumnFamilyHandleImpl(ColumnFamilyData* cfd, DBImpl* db, port::Mutex* mutex);
// destroy without mutex
virtual ~ColumnFamilyHandleImpl();
virtual ColumnFamilyData* cfd() const { return cfd_; }
private:
ColumnFamilyData* cfd_;
DBImpl* db_;
port::Mutex* mutex_;
};
// does not ref-count cfd_
class ColumnFamilyHandleInternal : public ColumnFamilyHandleImpl {
public:
ColumnFamilyHandleInternal()
: ColumnFamilyHandleImpl(nullptr, nullptr, nullptr) {}
void SetCFD(ColumnFamilyData* cfd) { internal_cfd_ = cfd; }
virtual ColumnFamilyData* cfd() const override { return internal_cfd_; }
private:
ColumnFamilyData* internal_cfd_;
};
// holds references to memtable, all immutable memtables and version
struct SuperVersion {
@ -63,12 +92,35 @@ extern ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
const InternalFilterPolicy* ipolicy,
const ColumnFamilyOptions& src);
class ColumnFamilySet;
// column family metadata. not thread-safe. should be protected by db_mutex
class ColumnFamilyData {
public:
~ColumnFamilyData();
uint32_t GetID() const { return id_; }
const std::string& GetName() { return name_; }
// DB mutex held for all these
void Ref() { ++refs_; }
// will just decrease reference count to 0, but will not delete it. returns
// true if the ref count was decreased to zero and needs to be cleaned up by
// the caller
bool Unref() {
assert(refs_ > 0);
return --refs_ == 0;
}
bool Dead() { return refs_ == 0; }
// SetDropped() and IsDropped() are thread-safe
void SetDropped() {
// can't drop default CF
assert(id_ != 0);
dropped_.store(true);
}
bool IsDropped() const { return dropped_.load(); }
int NumberLevels() const { return options_.num_levels; }
void SetLogNumber(uint64_t log_number) { log_number_ = log_number; }
@ -126,16 +178,19 @@ class ColumnFamilyData {
const std::string& name, Version* dummy_versions,
Cache* table_cache, const ColumnFamilyOptions& options,
const DBOptions* db_options,
const EnvOptions& storage_options);
~ColumnFamilyData();
const EnvOptions& storage_options,
ColumnFamilySet* column_family_set);
ColumnFamilyData* next() { return next_.load(); }
ColumnFamilyData* next() { return next_; }
uint32_t id_;
const std::string name_;
Version* dummy_versions_; // Head of circular doubly-linked list of versions.
Version* current_; // == dummy_versions->prev_
int refs_; // outstanding references to ColumnFamilyData
std::atomic<bool> dropped_; // true if client dropped it
const InternalKeyComparator internal_comparator_;
const InternalFilterPolicy internal_filter_policy_;
@ -157,8 +212,8 @@ class ColumnFamilyData {
// pointers for a circular linked list. we use it to support iterations
// that can be concurrent with writes
std::atomic<ColumnFamilyData*> next_;
std::atomic<ColumnFamilyData*> prev_;
ColumnFamilyData* next_;
ColumnFamilyData* prev_;
// This is the earliest log file number that contains data from this
// Column Family. All earlier log files must be ignored and not
@ -172,6 +227,8 @@ class ColumnFamilyData {
// An object that keeps all the compaction stats
// and picks the next compaction
std::unique_ptr<CompactionPicker> compaction_picker_;
ColumnFamilySet* column_family_set_;
};
// Thread safe only for reading without a writer. All access should be
@ -183,7 +240,10 @@ class ColumnFamilySet {
explicit iterator(ColumnFamilyData* cfd)
: current_(cfd) {}
iterator& operator++() {
// dummy is never dead, so this will never be infinite
do {
current_ = current_->next();
} while (current_->Dead());
return *this;
}
bool operator!=(const iterator& other) {
@ -216,26 +276,31 @@ class ColumnFamilySet {
ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id,
Version* dummy_version,
const ColumnFamilyOptions& options);
void DropColumnFamily(uint32_t id);
void DropColumnFamily(ColumnFamilyData* cfd);
iterator begin() { return iterator(dummy_cfd_->next()); }
iterator end() { return iterator(dummy_cfd_); }
// ColumnFamilySet has interesting thread-safety requirements
// * CreateColumnFamily() or DropColumnFamily() -- need to Lock() and also
// execute inside of DB mutex
// * Iterate -- without any locks
// * GetDefault(), GetColumnFamily(), Exists(), GetID(),
// GetNextColumnFamilyID() -- either inside of DB mutex or call Lock()
// * CreateColumnFamily() or DropColumnFamily() -- need to protect by DB
// mutex. Inside, column_family_data_ and column_families_ will be protected
// by Lock() and Unlock()
// * Iterate -- hold DB mutex, but you can release it in the body of
// iteration. If you release DB mutex in body, reference the column
// family before the mutex and unreference after you unlock, since the column
// family might get dropped when you release the DB mutex.
// * GetDefault(), GetColumnFamily(), Exists(), GetID() -- either inside of DB
// mutex or call Lock()
// * GetNextColumnFamilyID() -- inside of DB mutex
void Lock();
void Unlock();
private:
// when mutating: 1. DB mutex locked first, 2. spinlock locked second
// when reading, either: 1. lock DB mutex, or 2. lock spinlock
// (if both, respect the ordering to avoid deadlock!)
std::unordered_map<std::string, uint32_t> column_families_;
std::unordered_map<uint32_t, ColumnFamilyData*> column_family_data_;
// we need to keep them alive because we still can't control the lifetime of
// all of column family data members (options for example)
std::vector<ColumnFamilyData*> droppped_column_families_;
uint32_t max_column_family_;
ColumnFamilyData* dummy_cfd_;
@ -266,12 +331,12 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
virtual const Options* GetFullOptions() const override;
// Returns column family handle for the selected column family
virtual const ColumnFamilyHandle& GetColumnFamilyHandle() const override;
virtual ColumnFamilyHandle* GetColumnFamilyHandle() override;
private:
ColumnFamilySet* column_family_set_;
ColumnFamilyData* current_;
ColumnFamilyHandle handle_;
ColumnFamilyHandleInternal handle_;
};
} // namespace rocksdb

@ -31,6 +31,10 @@ class ColumnFamilyTest {
}
void Close() {
for (auto h : handles_) {
delete h;
}
handles_.clear();
delete db_;
db_ = nullptr;
}
@ -45,6 +49,10 @@ class ColumnFamilyTest {
}
void Destroy() {
for (auto h : handles_) {
delete h;
}
handles_.clear();
delete db_;
db_ = nullptr;
ASSERT_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_)));
@ -59,6 +67,14 @@ class ColumnFamilyTest {
}
}
void DropColumnFamilies(const vector<int>& cfs) {
for (auto cf : cfs) {
ASSERT_OK(db_->DropColumnFamily(handles_[cf]));
delete handles_[cf];
handles_[cf] = nullptr;
}
}
Status Put(int cf, const string& key, const string& value) {
return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(value));
}
@ -111,6 +127,12 @@ class ColumnFamilyTest {
return result;
}
int CountLiveFiles(int cf) {
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
return static_cast<int>(metadata.size());
}
// Do n memtable flushes, each of which produces an sstable
// covering the range [small,large].
void MakeTables(int cf, int n, const string& small,
@ -146,7 +168,7 @@ class ColumnFamilyTest {
ASSERT_OK(destfile->Close());
}
vector<ColumnFamilyHandle> handles_;
vector<ColumnFamilyHandle*> handles_;
ColumnFamilyOptions column_family_options_;
DBOptions db_options_;
string dbname_;
@ -156,16 +178,9 @@ class ColumnFamilyTest {
TEST(ColumnFamilyTest, AddDrop) {
ASSERT_OK(Open({"default"}));
ColumnFamilyHandle handles[4];
ASSERT_OK(
db_->CreateColumnFamily(column_family_options_, "one", &handles[0]));
ASSERT_OK(
db_->CreateColumnFamily(column_family_options_, "two", &handles[1]));
ASSERT_OK(
db_->CreateColumnFamily(column_family_options_, "three", &handles[2]));
ASSERT_OK(db_->DropColumnFamily(handles[1]));
ASSERT_OK(
db_->CreateColumnFamily(column_family_options_, "four", &handles[3]));
CreateColumnFamilies({"one", "two", "three"});
DropColumnFamilies({2});
CreateColumnFamilies({"four"});
Close();
ASSERT_TRUE(Open({"default"}).IsInvalidArgument());
ASSERT_OK(Open({"default", "one", "three", "four"}));
@ -177,6 +192,33 @@ TEST(ColumnFamilyTest, AddDrop) {
ASSERT_TRUE(families == vector<string>({"default", "four", "one", "three"}));
}
TEST(ColumnFamilyTest, DropTest) {
// first iteration - dont reopen DB before dropping
// second iteration - reopen DB before dropping
for (int iter = 0; iter < 2; ++iter) {
ASSERT_OK(Open({"default"}));
CreateColumnFamilies({"pikachu"});
Close();
ASSERT_OK(Open({"default", "pikachu"}));
for (int i = 0; i < 100; ++i) {
ASSERT_OK(Put(1, std::to_string(i), "bar" + std::to_string(i)));
}
ASSERT_OK(Flush(1));
if (iter == 1) {
Close();
ASSERT_OK(Open({"default", "pikachu"}));
}
ASSERT_EQ("bar1", Get(1, "1"));
ASSERT_EQ(CountLiveFiles(1), 1);
DropColumnFamilies({1});
// make sure that all files are deleted when we drop the column family
ASSERT_EQ(CountLiveFiles(1), 0);
Destroy();
}
}
TEST(ColumnFamilyTest, ReadWrite) {
ASSERT_OK(Open({"default"}));
CreateColumnFamilies({"one", "two"});

@ -43,6 +43,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level,
is_full_compaction_(false),
level_ptrs_(std::vector<size_t>(number_levels_)) {
cfd_->Ref();
input_version_->Ref();
edit_ = new VersionEdit();
edit_->SetColumnFamily(cfd_->GetID());
@ -56,6 +57,11 @@ Compaction::~Compaction() {
if (input_version_ != nullptr) {
input_version_->Unref();
}
if (cfd_ != nullptr) {
if (cfd_->Unref()) {
delete cfd_;
}
}
}
bool Compaction::IsTrivialMove() const {
@ -168,6 +174,12 @@ void Compaction::ReleaseInputs() {
input_version_->Unref();
input_version_ = nullptr;
}
if (cfd_ != nullptr) {
if (cfd_->Unref()) {
delete cfd_;
}
cfd_ = nullptr;
}
}
void Compaction::ReleaseCompactionFiles(Status status) {

@ -74,7 +74,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
// Make a set of all of the live *.sst files
std::set<uint64_t> live;
default_cfd_->current()->AddLiveFiles(&live);
default_cf_handle_->cfd()->current()->AddLiveFiles(&live);
ret.clear();
ret.reserve(live.size() + 2); //*.sst + CURRENT + MANIFEST

@ -42,7 +42,6 @@
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/column_family.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/statistics.h"
@ -218,6 +217,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
shutting_down_(nullptr),
bg_cv_(&mutex_),
logfile_number_(0),
default_cf_handle_(nullptr),
tmp_batch_(),
bg_compaction_scheduled_(0),
bg_manual_only_(0),
@ -270,14 +270,24 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
DBImpl::~DBImpl() {
// Wait for background work to finish
mutex_.Lock();
if (flush_on_destroy_) {
autovector<ColumnFamilyData*> to_delete;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->mem()->GetFirstSequenceNumber() != 0) {
cfd->Ref();
mutex_.Unlock();
FlushMemTable(cfd, FlushOptions());
mutex_.Lock();
if (cfd->Unref()) {
to_delete.push_back(cfd);
}
}
}
mutex_.Lock();
for (auto cfd : to_delete) {
delete cfd;
}
}
shutting_down_.Release_Store(this); // Any non-nullptr value is ok
while (bg_compaction_scheduled_ ||
bg_flush_scheduled_ ||
@ -285,6 +295,10 @@ DBImpl::~DBImpl() {
bg_cv_.Wait();
}
mutex_.Unlock();
if (default_cf_handle_ != nullptr) {
// we need to delete handle outside of lock because it does its own locking
delete default_cf_handle_;
}
if (db_lock_ != nullptr) {
env_->UnlockFile(db_lock_);
@ -816,7 +830,8 @@ Status DBImpl::Recover(
Status s = versions_->Recover(column_families);
if (s.ok()) {
SequenceNumber max_sequence(0);
default_cfd_ = versions_->GetColumnFamilySet()->GetDefault();
default_cf_handle_ = new ColumnFamilyHandleImpl(
versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
// Recover from all newer log files than the ones named in the
// descriptor (new log files may have been added by the previous
@ -891,6 +906,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
mutex_.AssertHeld();
std::unordered_map<int, VersionEdit> version_edits;
// no need to refcount because iteration is under mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
@ -934,7 +950,6 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
}
WriteBatchInternal::SetContents(&batch, record);
// No need to lock ColumnFamilySet here since its under a DB mutex
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), log_number);
@ -950,6 +965,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
}
if (!read_only) {
// no need to refcount since client still doesn't have access
// to the DB and can not drop column families while we iterate
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->mem()->ApproximateMemoryUsage() >
cfd->options()->write_buffer_size) {
@ -973,6 +990,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
}
if (!read_only) {
// no need to refcount since client still doesn't have access
// to the DB and can not drop column families while we iterate
for (auto cfd : *versions_->GetColumnFamilySet()) {
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
@ -1198,10 +1217,8 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
// This will release and re-acquire the mutex.
Status s = WriteLevel0Table(cfd, mems, edit, &file_number);
if (s.ok() && shutting_down_.Acquire_Load()) {
s = Status::IOError(
"Database shutdown started during memtable compaction"
);
if (s.ok() && (shutting_down_.Acquire_Load() || cfd->IsDropped())) {
s = Status::IOError("Column family closed during memtable flush");
}
// Replace immutable memtable with the generated Table
@ -1229,15 +1246,11 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
return s;
}
Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family,
Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end,
bool reduce_level, int target_level) {
mutex_.Lock();
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
mutex_.Unlock();
// this is asserting because client calling DB methods with undefined
// ColumnFamilyHandle is undefined behavior.
assert(cfd != nullptr);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
Status s = FlushMemTable(cfd, FlushOptions());
if (!s.ok()) {
@ -1367,38 +1380,25 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
return status;
}
int DBImpl::NumberLevels(const ColumnFamilyHandle& column_family) {
mutex_.Lock();
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
mutex_.Unlock();
assert(cfd != nullptr);
return cfd->NumberLevels();
int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
return cfh->cfd()->NumberLevels();
}
int DBImpl::MaxMemCompactionLevel(const ColumnFamilyHandle& column_family) {
mutex_.Lock();
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
mutex_.Unlock();
assert(cfd != nullptr);
return cfd->options()->max_mem_compaction_level;
int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
return cfh->cfd()->options()->max_mem_compaction_level;
}
int DBImpl::Level0StopWriteTrigger(const ColumnFamilyHandle& column_family) {
mutex_.Lock();
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
mutex_.Unlock();
assert(cfd != nullptr);
return cfd->options()->level0_stop_writes_trigger;
int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
return cfh->cfd()->options()->level0_stop_writes_trigger;
}
Status DBImpl::Flush(const FlushOptions& options,
const ColumnFamilyHandle& column_family) {
mutex_.Lock();
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
mutex_.Unlock();
assert(cfd != nullptr);
return FlushMemTable(cfd, options);
ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
return FlushMemTable(cfh->cfd(), options);
}
SequenceNumber DBImpl::GetLatestSequenceNumber() const {
@ -1666,11 +1666,12 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
Status DBImpl::TEST_CompactRange(int level,
const Slice* begin,
const Slice* end) {
auto default_cfd = default_cf_handle_->cfd();
int output_level =
(default_cfd_->options()->compaction_style == kCompactionStyleUniversal)
(default_cfd->options()->compaction_style == kCompactionStyleUniversal)
? level
: level + 1;
return RunManualCompaction(default_cfd_, level, output_level, begin, end);
return RunManualCompaction(default_cfd, level, output_level, begin, end);
}
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
@ -1698,11 +1699,11 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
}
Status DBImpl::TEST_FlushMemTable() {
return FlushMemTable(default_cfd_, FlushOptions());
return FlushMemTable(default_cf_handle_->cfd(), FlushOptions());
}
Status DBImpl::TEST_WaitForFlushMemTable() {
return WaitForFlushMemTable(default_cfd_);
return WaitForFlushMemTable(default_cf_handle_->cfd());
}
Status DBImpl::TEST_WaitForCompact() {
@ -1728,6 +1729,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
// DB is being deleted; no more background compactions
} else {
bool is_flush_pending = false;
// no need to refcount since we're under a mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->imm()->IsFlushPending()) {
is_flush_pending = true;
@ -1744,6 +1746,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
}
}
bool is_compaction_needed = false;
// no need to refcount since we're under a mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->current()->NeedsCompaction()) {
is_compaction_needed = true;
@ -1774,17 +1777,38 @@ void DBImpl::BGWorkCompaction(void* db) {
Status DBImpl::BackgroundFlush(bool* madeProgress,
DeletionState& deletion_state) {
Status stat;
mutex_.AssertHeld();
// call_status is failure if at least one flush was a failure. even if
// flushing one column family reports a failure, we will continue flushing
// other column families. however, call_status will be a failure in that case.
Status call_status;
autovector<ColumnFamilyData*> to_delete;
// refcounting in iteration
for (auto cfd : *versions_->GetColumnFamilySet()) {
while (stat.ok() && cfd->imm()->IsFlushPending()) {
if (cfd->IsDropped()) {
continue;
}
cfd->Ref();
Status flush_status;
while (flush_status.ok() && cfd->imm()->IsFlushPending()) {
Log(options_.info_log,
"BackgroundCallFlush doing FlushMemTableToOutputFile with column "
"family %u, flush slots available %d",
cfd->GetID(), options_.max_background_flushes - bg_flush_scheduled_);
stat = FlushMemTableToOutputFile(cfd, madeProgress, deletion_state);
flush_status =
FlushMemTableToOutputFile(cfd, madeProgress, deletion_state);
}
if (call_status.ok() && !flush_status.ok()) {
call_status = flush_status;
}
if (cfd->Unref()) {
to_delete.push_back(cfd);
}
}
for (auto cfd : to_delete) {
delete cfd;
}
return stat;
return call_status;
}
void DBImpl::BackgroundCallFlush() {
@ -1835,7 +1859,7 @@ void DBImpl::TEST_PurgeObsoleteteWAL() {
uint64_t DBImpl::TEST_GetLevel0TotalSize() {
MutexLock l(&mutex_);
return default_cfd_->current()->NumLevelBytes(0);
return default_cf_handle_->cfd()->current()->NumLevelBytes(0);
}
void DBImpl::BackgroundCallCompaction() {
@ -1921,8 +1945,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
? "(end)"
: manual_end->DebugString().c_str()));
} else {
// no need to refcount in iteration since it's always under a mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->options()->disable_auto_compactions) {
if (!cfd->options()->disable_auto_compactions && !cfd->IsDropped()) {
c.reset(cfd->PickCompaction());
if (c != nullptr) {
// update statistics
@ -2302,7 +2327,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
compaction_filter = compaction_filter_from_factory.get();
}
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
while (input->Valid() && !shutting_down_.Acquire_Load() &&
!cfd->IsDropped()) {
Slice key = input->key();
Slice value = input->value();
@ -2546,8 +2572,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
}
}
if (status.ok() && shutting_down_.Acquire_Load()) {
status = Status::IOError("Database shutdown started during compaction");
if (status.ok() && (shutting_down_.Acquire_Load() || cfd->IsDropped())) {
status = Status::IOError("Column family closing started during compaction");
}
if (status.ok() && compact->builder != nullptr) {
status = FinishCompactionOutputFile(compact, input.get());
@ -2638,8 +2664,7 @@ struct IterState {
static void CleanupIteratorState(void* arg1, void* arg2) {
IterState* state = reinterpret_cast<IterState*>(arg1);
DBImpl::DeletionState deletion_state(state->db->GetOptions().
max_write_buffer_number);
DBImpl::DeletionState deletion_state;
bool need_cleanup = state->super_version->Unref();
if (need_cleanup) {
@ -2677,11 +2702,17 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
return internal_iter;
}
ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
return default_cf_handle_;
}
Iterator* DBImpl::TEST_NewInternalIterator() {
mutex_.Lock();
SuperVersion* super_version = default_cfd_->GetSuperVersion()->Ref();
SuperVersion* super_version =
default_cf_handle_->cfd()->GetSuperVersion()->Ref();
mutex_.Unlock();
return NewInternalIterator(ReadOptions(), default_cfd_, super_version);
return NewInternalIterator(ReadOptions(), default_cf_handle_->cfd(),
super_version);
}
std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
@ -2725,11 +2756,11 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
MutexLock l(&mutex_);
return default_cfd_->current()->MaxNextLevelOverlappingBytes();
return default_cf_handle_->cfd()->current()->MaxNextLevelOverlappingBytes();
}
Status DBImpl::Get(const ReadOptions& options,
const ColumnFamilyHandle& column_family, const Slice& key,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) {
return GetImpl(options, column_family, key, value);
}
@ -2762,18 +2793,16 @@ void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd,
}
Status DBImpl::GetImpl(const ReadOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key, std::string* value,
bool* value_found) {
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool* value_found) {
StopWatch sw(env_, options_.statistics.get(), DB_GET, false);
StopWatchNano snapshot_timer(env_, false);
StartPerfTimer(&snapshot_timer);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
mutex_.Lock();
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
// this is asserting because client calling DB methods with undefined
// ColumnFamilyHandle is undefined behavior.
assert(cfd != nullptr);
SuperVersion* get_version = cfd->GetSuperVersion()->Ref();
mutex_.Unlock();
@ -2851,7 +2880,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
std::vector<Status> DBImpl::MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle>& column_family,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET, false);
@ -2869,8 +2898,12 @@ std::vector<Status> DBImpl::MultiGet(
std::unordered_map<uint32_t, MultiGetColumnFamilyData*> multiget_cf_data;
// fill up and allocate outside of mutex
for (auto cf : column_family) {
if (multiget_cf_data.find(cf.id) == multiget_cf_data.end()) {
multiget_cf_data.insert({cf.id, new MultiGetColumnFamilyData()});
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(cf);
auto cfd = cfh->cfd();
if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) {
auto mgcfd = new MultiGetColumnFamilyData();
mgcfd->cfd = cfd;
multiget_cf_data.insert({cfd->GetID(), mgcfd});
}
}
@ -2881,10 +2914,8 @@ std::vector<Status> DBImpl::MultiGet(
snapshot = versions_->LastSequence();
}
for (auto mgd_iter : multiget_cf_data) {
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(mgd_iter.first);
assert(cfd != nullptr);
mgd_iter.second->cfd = cfd;
mgd_iter.second->super_version = cfd->GetSuperVersion()->Ref();
mgd_iter.second->super_version =
mgd_iter.second->cfd->GetSuperVersion()->Ref();
}
mutex_.Unlock();
@ -2910,7 +2941,8 @@ std::vector<Status> DBImpl::MultiGet(
std::string* value = &(*values)[i];
LookupKey lkey(keys[i], snapshot);
auto mgd_iter = multiget_cf_data.find(column_family[i].id);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]);
auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
assert(mgd_iter != multiget_cf_data.end());
auto mgd = mgd_iter->second;
auto super_version = mgd->super_version;
@ -2974,64 +3006,61 @@ std::vector<Status> DBImpl::MultiGet(
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
const std::string& column_family_name,
ColumnFamilyHandle* handle) {
// whenever we are writing to column family set, we have to lock
versions_->GetColumnFamilySet()->Lock();
ColumnFamilyHandle** handle) {
mutex_.Lock();
if (versions_->GetColumnFamilySet()->Exists(column_family_name)) {
return Status::InvalidArgument("Column family already exists");
}
VersionEdit edit;
edit.AddColumnFamily(column_family_name);
handle->id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
edit.SetColumnFamily(handle->id);
uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
edit.SetColumnFamily(new_id);
mutex_.Lock();
Status s = versions_->LogAndApply(default_cfd_, &edit, &mutex_);
Status s = versions_->LogAndApply(default_cf_handle_->cfd(), &edit, &mutex_);
if (s.ok()) {
// add to internal data structures
versions_->CreateColumnFamily(options, &edit);
auto cfd = versions_->CreateColumnFamily(options, &edit);
*handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
}
mutex_.Unlock();
versions_->GetColumnFamilySet()->Unlock();
Log(options_.info_log, "Created column family \"%s\"",
column_family_name.c_str());
return s;
}
Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) {
if (column_family.id == 0) {
Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
if (cfd->GetID() == 0) {
return Status::InvalidArgument("Can't drop default column family");
}
// whenever we are writing to column family set, we have to lock
versions_->GetColumnFamilySet()->Lock();
if (!versions_->GetColumnFamilySet()->Exists(column_family.id)) {
return Status::NotFound("Column family not found");
}
Log(options_.info_log, "Dropping column family with id %u\n", cfd->GetID());
VersionEdit edit;
edit.DropColumnFamily();
edit.SetColumnFamily(column_family.id);
mutex_.Lock();
Status s = versions_->LogAndApply(default_cfd_, &edit, &mutex_);
edit.SetColumnFamily(cfd->GetID());
MutexLock l(&mutex_);
if (cfd->IsDropped()) {
return Status::InvalidArgument("Column family already dropped!\n");
}
Status s = versions_->LogAndApply(cfd, &edit, &mutex_);
if (s.ok()) {
// remove from internal data structures
versions_->DropColumnFamily(&edit);
cfd->SetDropped();
// DB is holding one reference to each column family when it's alive,
// need to drop it now
if (cfd->Unref()) {
delete cfd;
}
}
versions_->GetColumnFamilySet()->Unlock();
DeletionState deletion_state;
FindObsoleteFiles(deletion_state, false, true);
mutex_.Unlock();
PurgeObsoleteFiles(deletion_state);
Log(options_.info_log, "Dropped column family with id %u\n",
column_family.id);
return s;
}
bool DBImpl::KeyMayExist(const ReadOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key, std::string* value,
bool* value_found) {
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool* value_found) {
if (value_found != nullptr) {
// falsify later if key-may-exist but can't fetch value
*value_found = true;
@ -3047,12 +3076,12 @@ bool DBImpl::KeyMayExist(const ReadOptions& options,
}
Iterator* DBImpl::NewIterator(const ReadOptions& options,
const ColumnFamilyHandle& column_family) {
ColumnFamilyHandle* column_family) {
SequenceNumber latest_snapshot = 0;
SuperVersion* super_version = nullptr;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
mutex_.Lock();
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
assert(cfd != nullptr);
if (!options.tailing) {
super_version = cfd->GetSuperVersion()->Ref();
latest_snapshot = versions_->LastSequence();
@ -3083,7 +3112,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
Status DBImpl::NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle>& column_family,
const std::vector<ColumnFamilyHandle*>& column_family,
std::vector<Iterator*>* iterators) {
// TODO
return Status::NotSupported("Not yet!");
@ -3100,20 +3129,15 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
}
// Convenience methods
Status DBImpl::Put(const WriteOptions& o,
const ColumnFamilyHandle& column_family, const Slice& key,
const Slice& val) {
Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& val) {
return DB::Put(o, column_family, key, val);
}
Status DBImpl::Merge(const WriteOptions& o,
const ColumnFamilyHandle& column_family, const Slice& key,
const Slice& val) {
mutex_.Lock();
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
mutex_.Unlock();
assert(cfd != nullptr);
if (!cfd->options()->merge_operator) {
Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& val) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
if (!cfh->cfd()->options()->merge_operator) {
return Status::NotSupported("Provide a merge_operator when opening DB");
} else {
return DB::Merge(o, column_family, key, val);
@ -3121,8 +3145,7 @@ Status DBImpl::Merge(const WriteOptions& o,
}
Status DBImpl::Delete(const WriteOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key) {
ColumnFamilyHandle* column_family, const Slice& key) {
return DB::Delete(options, column_family, key);
}
@ -3155,13 +3178,22 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
}
Status status;
autovector<ColumnFamilyData*> to_delete;
// refcounting cfd in iteration
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->Ref();
// May temporarily unlock and wait.
status = MakeRoomForWrite(cfd, my_batch == nullptr);
if (cfd->Unref()) {
to_delete.push_back(cfd);
}
if (!status.ok()) {
break;
}
}
for (auto cfd : to_delete) {
delete cfd;
}
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions
@ -3221,13 +3253,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
if (status.ok()) {
StopWatchNano write_memtable_timer(env_, false);
// reading the column family set outside of DB mutex -- should lock
versions_->GetColumnFamilySet()->Lock();
StartPerfTimer(&write_memtable_timer);
status = WriteBatchInternal::InsertInto(
updates, column_family_memtables_.get(), 0, this, false);
BumpPerfTime(&perf_context.write_memtable_time, &write_memtable_timer);
versions_->GetColumnFamilySet()->Unlock();
if (!status.ok()) {
// Panic for in-memory corruptions
@ -3536,29 +3565,28 @@ Env* DBImpl::GetEnv() const {
return env_;
}
const Options& DBImpl::GetOptions(const ColumnFamilyHandle& column_family)
const {
return *default_cfd_->full_options();
const Options& DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
return *cfh->cfd()->full_options();
}
bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
const Slice& property, std::string* value) {
value->clear();
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
MutexLock l(&mutex_);
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
assert(cfd != nullptr);
return cfd->internal_stats()->GetProperty(property, value, cfd);
}
void DBImpl::GetApproximateSizes(const ColumnFamilyHandle& column_family,
void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family,
const Range* range, int n, uint64_t* sizes) {
// TODO(opt): better implementation
Version* v;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
{
MutexLock l(&mutex_);
auto cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
assert(cfd != nullptr);
v = cfd->current();
v->Ref();
}
@ -3654,9 +3682,9 @@ Status DBImpl::DeleteFile(std::string name) {
return status;
}
void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData> *metadata) {
void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
MutexLock l(&mutex_);
return versions_->GetLiveFilesMetaData(metadata);
versions_->GetLiveFilesMetaData(metadata);
}
Status DBImpl::GetDbIdentity(std::string& identity) {
@ -3688,38 +3716,40 @@ Status DBImpl::GetDbIdentity(std::string& identity) {
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const ColumnFamilyHandle& column_family,
Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
// Pre-allocate size of write batch conservatively.
// 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
// and we allocate 11 extra bytes for key length, as well as value length.
WriteBatch batch(key.size() + value.size() + 24);
batch.Put(column_family.id, key, value);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
batch.Put(cfh->cfd()->GetID(), key, value);
return Write(opt, &batch);
}
Status DB::Delete(const WriteOptions& opt,
const ColumnFamilyHandle& column_family, const Slice& key) {
Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key) {
WriteBatch batch;
batch.Delete(column_family.id, key);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
batch.Delete(cfh->cfd()->GetID(), key);
return Write(opt, &batch);
}
Status DB::Merge(const WriteOptions& opt,
const ColumnFamilyHandle& column_family, const Slice& key,
const Slice& value) {
Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Merge(column_family.id, key, value);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
batch.Merge(cfh->cfd()->GetID(), key, value);
return Write(opt, &batch);
}
// Default implementation -- returns not supported status
Status DB::CreateColumnFamily(const ColumnFamilyOptions& options,
const std::string& column_family_name,
ColumnFamilyHandle* handle) {
ColumnFamilyHandle** handle) {
return Status::NotSupported("");
}
Status DB::DropColumnFamily(const ColumnFamilyHandle& column_family) {
Status DB::DropColumnFamily(ColumnFamilyHandle* column_family) {
return Status::NotSupported("");
}
@ -3731,14 +3761,22 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(default_column_family_name, cf_options));
std::vector<ColumnFamilyHandle> handles;
return DB::Open(db_options, dbname, column_families, &handles, dbptr);
std::vector<ColumnFamilyHandle*> handles;
Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
if (s.ok()) {
assert(handles.size() == 1);
// i can delete the handle since DBImpl is always holding a reference to
// default column family
delete handles[0];
}
return s;
}
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle>* handles, DB** dbptr) {
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
*dbptr = nullptr;
handles->clear();
EnvOptions soptions;
size_t max_write_buffer_size = 0;
@ -3784,20 +3822,22 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
// that we used by calling impl->versions_->NewFileNumber()
// The used log number are already written to manifest in RecoverLogFile()
// method
s = impl->versions_->LogAndApply(impl->default_cfd_, &edit, &impl->mutex_,
s = impl->versions_->LogAndApply(impl->default_cf_handle_->cfd(), &edit,
&impl->mutex_,
impl->db_directory_.get());
}
if (s.ok()) {
// set column family handles
handles->clear();
for (auto cf : column_families) {
if (!impl->versions_->GetColumnFamilySet()->Exists(cf.name)) {
s = Status::InvalidArgument("Column family not found: ", cf.name);
handles->clear();
break;
}
uint32_t id = impl->versions_->GetColumnFamilySet()->GetID(cf.name);
handles->push_back(ColumnFamilyHandle(id));
auto cfd = impl->versions_->GetColumnFamilySet()->GetColumnFamily(id);
assert(cfd != nullptr);
handles->push_back(
new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
}
}
if (s.ok()) {
@ -3836,6 +3876,9 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
if (s.ok()) {
*dbptr = impl;
} else {
for (auto h : *handles) {
delete h;
}
handles->clear();
delete impl;
}

@ -45,32 +45,31 @@ class DBImpl : public DB {
// Implementations of the DB interface
using DB::Put;
virtual Status Put(const WriteOptions& options,
const ColumnFamilyHandle& column_family, const Slice& key,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value);
using DB::Merge;
virtual Status Merge(const WriteOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key, const Slice& value);
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value);
using DB::Delete;
virtual Status Delete(const WriteOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key);
ColumnFamilyHandle* column_family, const Slice& key);
using DB::Write;
virtual Status Write(const WriteOptions& options, WriteBatch* updates);
using DB::Get;
virtual Status Get(const ReadOptions& options,
const ColumnFamilyHandle& column_family, const Slice& key,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value);
using DB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle>& column_family,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values);
virtual Status CreateColumnFamily(const ColumnFamilyOptions& options,
const std::string& column_family,
ColumnFamilyHandle* handle);
virtual Status DropColumnFamily(const ColumnFamilyHandle& column_family);
ColumnFamilyHandle** handle);
virtual Status DropColumnFamily(ColumnFamilyHandle* column_family);
// Returns false if key doesn't exist in the database and true if it may.
// If value_found is not passed in as null, then return the value if found in
@ -78,43 +77,41 @@ class DBImpl : public DB {
// , otherwise false.
using DB::KeyMayExist;
virtual bool KeyMayExist(const ReadOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key, std::string* value,
bool* value_found = nullptr);
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool* value_found = nullptr);
using DB::NewIterator;
virtual Iterator* NewIterator(const ReadOptions& options,
const ColumnFamilyHandle& column_family);
ColumnFamilyHandle* column_family);
virtual Status NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle>& column_family,
const std::vector<ColumnFamilyHandle*>& column_family,
std::vector<Iterator*>* iterators);
virtual const Snapshot* GetSnapshot();
virtual void ReleaseSnapshot(const Snapshot* snapshot);
using DB::GetProperty;
virtual bool GetProperty(const ColumnFamilyHandle& column_family,
virtual bool GetProperty(ColumnFamilyHandle* column_family,
const Slice& property, std::string* value);
using DB::GetApproximateSizes;
virtual void GetApproximateSizes(const ColumnFamilyHandle& column_family,
virtual void GetApproximateSizes(ColumnFamilyHandle* column_family,
const Range* range, int n, uint64_t* sizes);
using DB::CompactRange;
virtual Status CompactRange(const ColumnFamilyHandle& column_family,
virtual Status CompactRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end,
bool reduce_level = false, int target_level = -1);
using DB::NumberLevels;
virtual int NumberLevels(const ColumnFamilyHandle& column_family);
virtual int NumberLevels(ColumnFamilyHandle* column_family);
using DB::MaxMemCompactionLevel;
virtual int MaxMemCompactionLevel(const ColumnFamilyHandle& column_family);
virtual int MaxMemCompactionLevel(ColumnFamilyHandle* column_family);
using DB::Level0StopWriteTrigger;
virtual int Level0StopWriteTrigger(const ColumnFamilyHandle& column_family);
virtual int Level0StopWriteTrigger(ColumnFamilyHandle* column_family);
virtual const std::string& GetName() const;
virtual Env* GetEnv() const;
using DB::GetOptions;
virtual const Options& GetOptions(const ColumnFamilyHandle& column_family)
const;
virtual const Options& GetOptions(ColumnFamilyHandle* column_family) const;
using DB::Flush;
virtual Status Flush(const FlushOptions& options,
const ColumnFamilyHandle& column_family);
ColumnFamilyHandle* column_family);
virtual Status DisableFileDeletions();
virtual Status EnableFileDeletions(bool force);
// All the returned filenames start with "/"
@ -246,6 +243,8 @@ class DBImpl : public DB {
// It is not necessary to hold the mutex when invoking this method.
void PurgeObsoleteFiles(DeletionState& deletion_state);
ColumnFamilyHandle* DefaultColumnFamily() const;
protected:
Env* const env_;
const std::string dbname_;
@ -381,7 +380,7 @@ class DBImpl : public DB {
port::CondVar bg_cv_; // Signalled when background work finishes
uint64_t logfile_number_;
unique_ptr<log::Writer> log_;
ColumnFamilyData* default_cfd_;
ColumnFamilyHandleImpl* default_cf_handle_;
unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
std::string host_name_;
@ -493,9 +492,9 @@ class DBImpl : public DB {
// Function that Get and KeyMayExist call with no_io true or false
// Note: 'value_found' from KeyMayExist propagates here
Status GetImpl(const ReadOptions& options,
const ColumnFamilyHandle& column_family, const Slice& key,
std::string* value, bool* value_found = nullptr);
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, std::string* value,
bool* value_found = nullptr);
};
// Sanitize db options. The caller should delete result.info_log if

@ -29,7 +29,6 @@
#include "db/write_batch_internal.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/column_family.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "rocksdb/merge_operator.h"
@ -54,11 +53,12 @@ DBImplReadOnly::~DBImplReadOnly() {
// Implementations of the DB interface
Status DBImplReadOnly::Get(const ReadOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key, std::string* value) {
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) {
Status s;
SequenceNumber snapshot = versions_->LastSequence();
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
SuperVersion* super_version = cfd->GetSuperVersion();
MergeContext merge_context;
LookupKey lkey(key, snapshot);
@ -73,9 +73,9 @@ Status DBImplReadOnly::Get(const ReadOptions& options,
}
Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options,
const ColumnFamilyHandle& column_family) {
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
assert(cfd != nullptr);
ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
SequenceNumber latest_snapshot = versions_->LastSequence();
Iterator* internal_iter = NewInternalIterator(options, cfd, super_version);

@ -32,18 +32,18 @@ class DBImplReadOnly : public DBImpl {
// Implementations of the DB interface
using DB::Get;
virtual Status Get(const ReadOptions& options,
const ColumnFamilyHandle& column_family, const Slice& key,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value);
// TODO: Implement ReadOnly MultiGet?
using DBImpl::NewIterator;
virtual Iterator* NewIterator(const ReadOptions&,
const ColumnFamilyHandle& column_family);
ColumnFamilyHandle* column_family);
virtual Status NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle>& column_family,
const std::vector<ColumnFamilyHandle*>& column_family,
std::vector<Iterator*>* iterators) {
// TODO
return Status::NotSupported("Not supported yet.");
@ -51,27 +51,26 @@ class DBImplReadOnly : public DBImpl {
using DBImpl::Put;
virtual Status Put(const WriteOptions& options,
const ColumnFamilyHandle& column_family, const Slice& key,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::Merge;
virtual Status Merge(const WriteOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key, const Slice& value) {
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::Delete;
virtual Status Delete(const WriteOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key) {
ColumnFamilyHandle* column_family, const Slice& key) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status Write(const WriteOptions& options, WriteBatch* updates) {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::CompactRange;
virtual Status CompactRange(const ColumnFamilyHandle& column_family,
virtual Status CompactRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end,
bool reduce_level = false,
int target_level = -1) {
@ -90,7 +89,7 @@ class DBImplReadOnly : public DBImpl {
}
using DBImpl::Flush;
virtual Status Flush(const FlushOptions& options,
const ColumnFamilyHandle& column_family) {
ColumnFamilyHandle* column_family) {
return Status::NotSupported("Not supported operation in read only mode.");
}

@ -65,7 +65,7 @@ void DBImpl::LogDBDeployStats() {
uint64_t file_total_size = 0;
uint32_t file_total_num = 0;
Version* current = default_cfd_->current();
Version* current = default_cf_handle_->cfd()->current();
for (int i = 0; i < current->NumberLevels(); i++) {
file_total_num += current->NumLevelFiles(i);
file_total_size += current->NumLevelBytes(i);

@ -4741,24 +4741,30 @@ class ModelDB: public DB {
KVMap map_;
};
explicit ModelDB(const Options& options): options_(options) { }
explicit ModelDB(const Options& options) : options_(options) {}
using DB::Put;
virtual Status Put(const WriteOptions& o, const ColumnFamilyHandle& cf,
virtual Status Put(const WriteOptions& o, ColumnFamilyHandle* cf,
const Slice& k, const Slice& v) {
return DB::Put(o, cf, k, v);
WriteBatch batch;
batch.Put(0, k, v);
return Write(o, &batch);
}
using DB::Merge;
virtual Status Merge(const WriteOptions& o, const ColumnFamilyHandle& cf,
virtual Status Merge(const WriteOptions& o, ColumnFamilyHandle* cf,
const Slice& k, const Slice& v) {
return DB::Merge(o, cf, k, v);
WriteBatch batch;
batch.Merge(0, k, v);
return Write(o, &batch);
}
using DB::Delete;
virtual Status Delete(const WriteOptions& o, const ColumnFamilyHandle& cf,
virtual Status Delete(const WriteOptions& o, ColumnFamilyHandle* cf,
const Slice& key) {
return DB::Delete(o, cf, key);
WriteBatch batch;
batch.Delete(0, key);
return Write(o, &batch);
}
using DB::Get;
virtual Status Get(const ReadOptions& options, const ColumnFamilyHandle& cf,
virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* cf,
const Slice& key, std::string* value) {
return Status::NotSupported(key);
}
@ -4766,7 +4772,7 @@ class ModelDB: public DB {
using DB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle>& column_family,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
std::vector<Status> s(keys.size(),
Status::NotSupported("Not implemented."));
@ -4774,9 +4780,8 @@ class ModelDB: public DB {
}
using DB::KeyMayExist;
virtual bool KeyMayExist(const ReadOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key, std::string* value,
bool* value_found = nullptr) {
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool* value_found = nullptr) {
if (value_found != nullptr) {
*value_found = false;
}
@ -4784,7 +4789,7 @@ class ModelDB: public DB {
}
using DB::NewIterator;
virtual Iterator* NewIterator(const ReadOptions& options,
const ColumnFamilyHandle& column_family) {
ColumnFamilyHandle* column_family) {
if (options.snapshot == nullptr) {
KVMap* saved = new KVMap;
*saved = map_;
@ -4797,7 +4802,7 @@ class ModelDB: public DB {
}
virtual Status NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle>& column_family,
const std::vector<ColumnFamilyHandle*>& column_family,
std::vector<Iterator*>* iterators) {
return Status::NotSupported("Not supported yet");
}
@ -4831,36 +4836,34 @@ class ModelDB: public DB {
}
using DB::GetProperty;
virtual bool GetProperty(const ColumnFamilyHandle& column_family,
virtual bool GetProperty(ColumnFamilyHandle* column_family,
const Slice& property, std::string* value) {
return false;
}
using DB::GetApproximateSizes;
virtual void GetApproximateSizes(const ColumnFamilyHandle& column_family,
virtual void GetApproximateSizes(ColumnFamilyHandle* column_family,
const Range* range, int n, uint64_t* sizes) {
for (int i = 0; i < n; i++) {
sizes[i] = 0;
}
}
using DB::CompactRange;
virtual Status CompactRange(const ColumnFamilyHandle& column_family,
virtual Status CompactRange(ColumnFamilyHandle* column_family,
const Slice* start, const Slice* end,
bool reduce_level, int target_level) {
return Status::NotSupported("Not supported operation.");
}
using DB::NumberLevels;
virtual int NumberLevels(const ColumnFamilyHandle& column_family) {
return 1;
}
virtual int NumberLevels(ColumnFamilyHandle* column_family) { return 1; }
using DB::MaxMemCompactionLevel;
virtual int MaxMemCompactionLevel(const ColumnFamilyHandle& column_family) {
virtual int MaxMemCompactionLevel(ColumnFamilyHandle* column_family) {
return 1;
}
using DB::Level0StopWriteTrigger;
virtual int Level0StopWriteTrigger(const ColumnFamilyHandle& column_family) {
virtual int Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
return -1;
}
@ -4873,14 +4876,13 @@ class ModelDB: public DB {
}
using DB::GetOptions;
virtual const Options& GetOptions(const ColumnFamilyHandle& column_family)
const {
virtual const Options& GetOptions(ColumnFamilyHandle* column_family) const {
return options_;
}
using DB::Flush;
virtual Status Flush(const rocksdb::FlushOptions& options,
const ColumnFamilyHandle& column_family) {
ColumnFamilyHandle* column_family) {
Status ret;
return ret;
}
@ -4916,6 +4918,8 @@ class ModelDB: public DB {
return Status::NotSupported("Not supported in Model DB");
}
virtual ColumnFamilyHandle* DefaultColumnFamily() const { return nullptr; }
private:
class ModelIter: public Iterator {
public:

@ -1396,9 +1396,6 @@ VersionSet::VersionSet(const std::string& dbname, const DBOptions* options,
storage_options_compactions_(storage_options_) {}
VersionSet::~VersionSet() {
for (auto cfd : *column_family_set_) {
cfd->current()->Unref();
}
// we need to delete column_family_set_ because its destructor depends on
// VersionSet
column_family_set_.reset();
@ -1434,6 +1431,11 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
bool new_descriptor_log) {
mu->AssertHeld();
if (column_family_data->IsDropped()) {
// no need to write anything to the manifest
return Status::OK();
}
// queue our request
ManifestWriter w(mu, column_family_data, edit);
manifest_writers_.push_back(&w);
@ -1759,7 +1761,13 @@ Status VersionSet::Recover(
assert(builder != builders.end());
delete builder->second;
builders.erase(builder);
DropColumnFamily(&edit);
auto cfd = column_family_set_->GetColumnFamily(edit.column_family_);
if (cfd->Unref()) {
delete cfd;
} else {
// who else can have reference to cfd!?
assert(false);
}
} else if (cf_in_not_found) {
column_families_not_found.erase(edit.column_family_);
} else {
@ -2433,8 +2441,4 @@ ColumnFamilyData* VersionSet::CreateColumnFamily(
return new_cfd;
}
void VersionSet::DropColumnFamily(VersionEdit* edit) {
column_family_set_->DropColumnFamily(edit->column_family_);
}
} // namespace rocksdb

@ -396,8 +396,6 @@ class VersionSet {
ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& options,
VersionEdit* edit);
void DropColumnFamily(VersionEdit* edit);
ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); }
private:

@ -277,7 +277,13 @@ class MemTableInserter : public WriteBatch::Handler {
std::string prev_value;
std::string merged_value;
Status s = db_->Get(ropts, key, &prev_value);
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
Status s = db_->Get(ropts, cf_handle, key, &prev_value);
char* prev_buffer = const_cast<char*>(prev_value.c_str());
uint32_t prev_size = prev_value.size();
auto status = options->inplace_callback(s.ok() ? prev_buffer : nullptr,
@ -333,8 +339,11 @@ class MemTableInserter : public WriteBatch::Handler {
ReadOptions read_options;
read_options.snapshot = &read_from_snapshot;
db_->Get(read_options, cf_mems_->GetColumnFamilyHandle(), key,
&get_value);
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
db_->Get(read_options, cf_handle, key, &get_value);
Slice get_value_slice = Slice(get_value);
// 2) Apply this merge
@ -378,8 +387,11 @@ class MemTableInserter : public WriteBatch::Handler {
ReadOptions ropts;
ropts.snapshot = &read_from_snapshot;
std::string value;
if (!db_->KeyMayExist(ropts, cf_mems_->GetColumnFamilyHandle(), key,
&value)) {
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) {
RecordTick(options->statistics.get(), NUMBER_FILTERED_DELETES);
return;
}

@ -12,7 +12,6 @@
#include "rocksdb/write_batch.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/column_family.h"
namespace rocksdb {
@ -28,7 +27,7 @@ class ColumnFamilyMemTables {
virtual uint64_t GetLogNumber() const = 0;
virtual MemTable* GetMemTable() const = 0;
virtual const Options* GetFullOptions() const = 0;
virtual const ColumnFamilyHandle& GetColumnFamilyHandle() const = 0;
virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0;
};
class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables {
@ -53,9 +52,7 @@ class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables {
return options_;
}
const ColumnFamilyHandle& GetColumnFamilyHandle() const override {
return default_column_family;
}
ColumnFamilyHandle* GetColumnFamilyHandle() override { return nullptr; }
private:
bool ok_;

@ -1,31 +0,0 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "rocksdb/slice.h"
#include <string>
namespace rocksdb {
// Column family's name is translated to ColumnFamilyHandle at DB open or column
// family open time. Clients use ColumnFamilyHandle to comunicate with the DB
//
// Column family names that start with "." (a dot) are system specific and
// should not be used by the clients
struct ColumnFamilyHandle {
uint32_t id;
// default
ColumnFamilyHandle() : id() {}
explicit ColumnFamilyHandle(uint32_t _id) : id(_id) {}
};
const ColumnFamilyHandle default_column_family = ColumnFamilyHandle();
extern const std::string default_column_family_name;
} // namespace rocksdb

@ -23,8 +23,15 @@ namespace rocksdb {
using std::unique_ptr;
struct ColumnFamilyHandle;
extern const ColumnFamilyHandle default_column_family;
class ColumnFamilyHandle {
public:
ColumnFamilyHandle() {}
virtual ~ColumnFamilyHandle() {}
private:
ColumnFamilyHandle(const ColumnFamilyHandle&); // no copying
};
extern const std::string default_column_family_name;
struct ColumnFamilyDescriptor {
std::string name;
@ -106,7 +113,7 @@ class DB {
// will use to operate on column family column_family[i]
static Status Open(const DBOptions& db_options, const std::string& name,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle>* handles, DB** dbptr);
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr);
// ListColumnFamilies will open the DB specified by argument name
// and return the list of all column families in that DB
@ -123,23 +130,22 @@ class DB {
// through the argument handle.
virtual Status CreateColumnFamily(const ColumnFamilyOptions& options,
const std::string& column_family_name,
ColumnFamilyHandle* handle);
ColumnFamilyHandle** handle);
// Drop a column family specified by column_family handle.
// All data related to the column family will be deleted before
// the function returns.
// Calls referring to the dropped column family will fail.
virtual Status DropColumnFamily(const ColumnFamilyHandle& column_family);
// Drop a column family specified by column_family handle. This call
// only records a drop record in the manifest and prevents the column
// family from flushing and compacting.
virtual Status DropColumnFamily(ColumnFamilyHandle* column_family);
// Set the database entry for "key" to "value".
// Returns OK on success, and a non-OK status on error.
// Note: consider setting options.sync = true.
virtual Status Put(const WriteOptions& options,
const ColumnFamilyHandle& column_family, const Slice& key,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) = 0;
Status Put(const WriteOptions& options, const Slice& key,
const Slice& value) {
return Put(options, default_column_family, key, value);
return Put(options, DefaultColumnFamily(), key, value);
}
// Remove the database entry (if any) for "key". Returns OK on
@ -147,10 +153,10 @@ class DB {
// did not exist in the database.
// Note: consider setting options.sync = true.
virtual Status Delete(const WriteOptions& options,
const ColumnFamilyHandle& column_family,
ColumnFamilyHandle* column_family,
const Slice& key) = 0;
Status Delete(const WriteOptions& options, const Slice& key) {
return Delete(options, default_column_family, key);
return Delete(options, DefaultColumnFamily(), key);
}
// Merge the database entry for "key" with "value". Returns OK on success,
@ -158,11 +164,11 @@ class DB {
// determined by the user provided merge_operator when opening DB.
// Note: consider setting options.sync = true.
virtual Status Merge(const WriteOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key, const Slice& value) = 0;
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) = 0;
Status Merge(const WriteOptions& options, const Slice& key,
const Slice& value) {
return Merge(options, default_column_family, key, value);
return Merge(options, DefaultColumnFamily(), key, value);
}
// Apply the specified updates to the database.
@ -178,10 +184,10 @@ class DB {
//
// May return some other Status on an error.
virtual Status Get(const ReadOptions& options,
const ColumnFamilyHandle& column_family, const Slice& key,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) = 0;
Status Get(const ReadOptions& options, const Slice& key, std::string* value) {
return Get(options, default_column_family, key, value);
return Get(options, DefaultColumnFamily(), key, value);
}
// If keys[i] does not exist in the database, then the i'th returned
@ -196,13 +202,13 @@ class DB {
// duplicate values in order.
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle>& column_family,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) = 0;
std::vector<Status> MultiGet(const ReadOptions& options,
const std::vector<Slice>& keys,
std::vector<std::string>* values) {
return MultiGet(options, std::vector<ColumnFamilyHandle>(
keys.size(), default_column_family),
return MultiGet(options, std::vector<ColumnFamilyHandle*>(
keys.size(), DefaultColumnFamily()),
keys, values);
}
@ -214,9 +220,8 @@ class DB {
// to make this lighter weight is to avoid doing any IOs.
// Default implementation here returns true and sets 'value_found' to false
virtual bool KeyMayExist(const ReadOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key, std::string* value,
bool* value_found = nullptr) {
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool* value_found = nullptr) {
if (value_found != nullptr) {
*value_found = false;
}
@ -224,7 +229,7 @@ class DB {
}
bool KeyMayExist(const ReadOptions& options, const Slice& key,
std::string* value, bool* value_found = nullptr) {
return KeyMayExist(options, default_column_family, key, value, value_found);
return KeyMayExist(options, DefaultColumnFamily(), key, value, value_found);
}
// Return a heap-allocated iterator over the contents of the database.
@ -234,16 +239,16 @@ class DB {
// Caller should delete the iterator when it is no longer needed.
// The returned iterator should be deleted before this db is deleted.
virtual Iterator* NewIterator(const ReadOptions& options,
const ColumnFamilyHandle& column_family) = 0;
ColumnFamilyHandle* column_family) = 0;
Iterator* NewIterator(const ReadOptions& options) {
return NewIterator(options, default_column_family);
return NewIterator(options, DefaultColumnFamily());
}
// Returns iterators from a consistent database state across multiple
// column families. Iterators are heap allocated and need to be deleted
// before the db is deleted
virtual Status NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle>& column_family,
const std::vector<ColumnFamilyHandle*>& column_family,
std::vector<Iterator*>* iterators) = 0;
// Return a handle to the current DB state. Iterators created with
@ -270,10 +275,10 @@ class DB {
// about the internal operation of the DB.
// "rocksdb.sstables" - returns a multi-line string that describes all
// of the sstables that make up the db contents.
virtual bool GetProperty(const ColumnFamilyHandle& column_family,
virtual bool GetProperty(ColumnFamilyHandle* column_family,
const Slice& property, std::string* value) = 0;
bool GetProperty(const Slice& property, std::string* value) {
return GetProperty(default_column_family, property, value);
return GetProperty(DefaultColumnFamily(), property, value);
}
// For each i in [0,n-1], store in "sizes[i]", the approximate
@ -284,11 +289,11 @@ class DB {
// sizes will be one-tenth the size of the corresponding user data size.
//
// The results may not include the sizes of recently written data.
virtual void GetApproximateSizes(const ColumnFamilyHandle& column_family,
virtual void GetApproximateSizes(ColumnFamilyHandle* column_family,
const Range* range, int n,
uint64_t* sizes) = 0;
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
GetApproximateSizes(default_column_family, range, n, sizes);
GetApproximateSizes(DefaultColumnFamily(), range, n, sizes);
}
// Compact the underlying storage for the key range [*begin,*end].
@ -308,35 +313,31 @@ class DB {
// hosting all the files. In this case, client could set reduce_level
// to true, to move the files back to the minimum level capable of holding
// the data set or a given level (specified by non-negative target_level).
virtual Status CompactRange(const ColumnFamilyHandle& column_family,
virtual Status CompactRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end,
bool reduce_level = false,
int target_level = -1) = 0;
Status CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false, int target_level = -1) {
return CompactRange(default_column_family, begin, end, reduce_level,
return CompactRange(DefaultColumnFamily(), begin, end, reduce_level,
target_level);
}
// Number of levels used for this DB.
virtual int NumberLevels(const ColumnFamilyHandle& column_family) = 0;
int NumberLevels() {
return NumberLevels(default_column_family);
}
virtual int NumberLevels(ColumnFamilyHandle* column_family) = 0;
int NumberLevels() { return NumberLevels(DefaultColumnFamily()); }
// Maximum level to which a new compacted memtable is pushed if it
// does not create overlap.
virtual int MaxMemCompactionLevel(
const ColumnFamilyHandle& column_family) = 0;
virtual int MaxMemCompactionLevel(ColumnFamilyHandle* column_family) = 0;
int MaxMemCompactionLevel() {
return MaxMemCompactionLevel(default_column_family);
return MaxMemCompactionLevel(DefaultColumnFamily());
}
// Number of files in level-0 that would stop writes.
virtual int Level0StopWriteTrigger(
const ColumnFamilyHandle& column_family) = 0;
virtual int Level0StopWriteTrigger(ColumnFamilyHandle* column_family) = 0;
int Level0StopWriteTrigger() {
return Level0StopWriteTrigger(default_column_family);
return Level0StopWriteTrigger(DefaultColumnFamily());
}
// Get DB name -- the exact same name that was provided as an argument to
@ -347,17 +348,17 @@ class DB {
virtual Env* GetEnv() const = 0;
// Get DB Options that we use
virtual const Options& GetOptions(const ColumnFamilyHandle& column_family)
virtual const Options& GetOptions(ColumnFamilyHandle* column_family)
const = 0;
const Options& GetOptions() const {
return GetOptions(default_column_family);
return GetOptions(DefaultColumnFamily());
}
// Flush all mem-table data.
virtual Status Flush(const FlushOptions& options,
const ColumnFamilyHandle& column_family) = 0;
ColumnFamilyHandle* column_family) = 0;
Status Flush(const FlushOptions& options) {
return Flush(options, default_column_family);
return Flush(options, DefaultColumnFamily());
}
// Prevent file deletions. Compactions will continue to occur,
@ -426,6 +427,9 @@ class DB {
// be set properly
virtual Status GetDbIdentity(std::string& identity) = 0;
// Returns default column family handle
virtual ColumnFamilyHandle* DefaultColumnFamily() const = 0;
private:
// No copying allowed
DB(const DB&);

@ -27,7 +27,6 @@
#include <string>
#include "rocksdb/status.h"
#include "rocksdb/column_family.h"
namespace rocksdb {

@ -23,14 +23,14 @@ class StackableDB : public DB {
using DB::Put;
virtual Status Put(const WriteOptions& options,
const ColumnFamilyHandle& column_family, const Slice& key,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& val) override {
return db_->Put(options, column_family, key, val);
}
using DB::Get;
virtual Status Get(const ReadOptions& options,
const ColumnFamilyHandle& column_family, const Slice& key,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) override {
return db_->Get(options, column_family, key, value);
}
@ -38,7 +38,7 @@ class StackableDB : public DB {
using DB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle>& column_family,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override {
return db_->MultiGet(options, column_family, keys, values);
@ -46,23 +46,23 @@ class StackableDB : public DB {
using DB::KeyMayExist;
virtual bool KeyMayExist(const ReadOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key, std::string* value,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value,
bool* value_found = nullptr) override {
return db_->KeyMayExist(options, column_family, key, value, value_found);
}
using DB::Delete;
virtual Status Delete(const WriteOptions& wopts,
const ColumnFamilyHandle& column_family,
ColumnFamilyHandle* column_family,
const Slice& key) override {
return db_->Delete(wopts, column_family, key);
}
using DB::Merge;
virtual Status Merge(const WriteOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key, const Slice& value) override {
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override {
return db_->Merge(options, column_family, key, value);
}
@ -74,14 +74,13 @@ class StackableDB : public DB {
using DB::NewIterator;
virtual Iterator* NewIterator(const ReadOptions& opts,
const ColumnFamilyHandle& column_family)
override {
ColumnFamilyHandle* column_family) override {
return db_->NewIterator(opts, column_family);
}
virtual Status NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle>& column_family,
const std::vector<ColumnFamilyHandle*>& column_family,
std::vector<Iterator*>* iterators) {
return db_->NewIterators(options, column_family, iterators);
}
@ -96,20 +95,20 @@ class StackableDB : public DB {
}
using DB::GetProperty;
virtual bool GetProperty(const ColumnFamilyHandle& column_family,
virtual bool GetProperty(ColumnFamilyHandle* column_family,
const Slice& property, std::string* value) override {
return db_->GetProperty(column_family, property, value);
}
using DB::GetApproximateSizes;
virtual void GetApproximateSizes(const ColumnFamilyHandle& column_family,
virtual void GetApproximateSizes(ColumnFamilyHandle* column_family,
const Range* r, int n,
uint64_t* sizes) override {
return db_->GetApproximateSizes(column_family, r, n, sizes);
}
using DB::CompactRange;
virtual Status CompactRange(const ColumnFamilyHandle& column_family,
virtual Status CompactRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end,
bool reduce_level = false,
int target_level = -1) override {
@ -118,18 +117,18 @@ class StackableDB : public DB {
}
using DB::NumberLevels;
virtual int NumberLevels(const ColumnFamilyHandle& column_family) override {
virtual int NumberLevels(ColumnFamilyHandle* column_family) override {
return db_->NumberLevels(column_family);
}
using DB::MaxMemCompactionLevel;
virtual int MaxMemCompactionLevel(const ColumnFamilyHandle& column_family)
virtual int MaxMemCompactionLevel(ColumnFamilyHandle* column_family)
override {
return db_->MaxMemCompactionLevel(column_family);
}
using DB::Level0StopWriteTrigger;
virtual int Level0StopWriteTrigger(const ColumnFamilyHandle& column_family)
virtual int Level0StopWriteTrigger(ColumnFamilyHandle* column_family)
override {
return db_->Level0StopWriteTrigger(column_family);
}
@ -143,14 +142,14 @@ class StackableDB : public DB {
}
using DB::GetOptions;
virtual const Options& GetOptions(const ColumnFamilyHandle& column_family)
const override {
virtual const Options& GetOptions(ColumnFamilyHandle* column_family) const
override {
return db_->GetOptions(column_family);
}
using DB::Flush;
virtual Status Flush(const FlushOptions& fopts,
const ColumnFamilyHandle& column_family) override {
ColumnFamilyHandle* column_family) override {
return db_->Flush(fopts, column_family);
}
@ -189,6 +188,10 @@ class StackableDB : public DB {
return db_->GetUpdatesSince(seq_number, iter);
}
virtual ColumnFamilyHandle* DefaultColumnFamily() const override {
return db_->DefaultColumnFamily();
}
protected:
DB* db_;
};

@ -46,6 +46,9 @@ class FilterBlockBuilder {
bool SamePrefix(const Slice &key1, const Slice &key2) const;
void GenerateFilter();
// important: all of these might point to invalid addresses
// at the time of destruction of this filter block. destructor
// should NOT dereference them.
const FilterPolicy* policy_;
const SliceTransform* prefix_extractor_;
bool whole_key_filtering_;

@ -11,7 +11,6 @@
#include "db/filename.h"
#include "db/write_batch_internal.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/column_family.h"
#include "rocksdb/cache.h"
#include "util/coding.h"

@ -45,8 +45,8 @@ class DummyDB : public StackableDB {
}
using DB::GetOptions;
virtual const Options& GetOptions(const ColumnFamilyHandle& column_family)
const override {
virtual const Options& GetOptions(ColumnFamilyHandle* column_family) const
override {
return options_;
}
@ -70,6 +70,10 @@ class DummyDB : public StackableDB {
return Status::OK();
}
virtual ColumnFamilyHandle* DefaultColumnFamily() const override {
return nullptr;
}
class DummyLogFile : public LogFile {
public:
/* implicit */

@ -120,7 +120,7 @@ Status DBWithTTL::StripTS(std::string* str) {
}
Status DBWithTTL::Put(const WriteOptions& options,
const ColumnFamilyHandle& column_family, const Slice& key,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& val) {
WriteBatch batch;
batch.Put(key, val);
@ -128,7 +128,7 @@ Status DBWithTTL::Put(const WriteOptions& options,
}
Status DBWithTTL::Get(const ReadOptions& options,
const ColumnFamilyHandle& column_family, const Slice& key,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) {
Status st = db_->Get(options, key, value);
if (!st.ok()) {
@ -143,7 +143,7 @@ Status DBWithTTL::Get(const ReadOptions& options,
std::vector<Status> DBWithTTL::MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle>& column_family,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
return std::vector<Status>(keys.size(),
Status::NotSupported("MultiGet not\
@ -151,9 +151,8 @@ std::vector<Status> DBWithTTL::MultiGet(
}
bool DBWithTTL::KeyMayExist(const ReadOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key, std::string* value,
bool* value_found) {
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool* value_found) {
bool ret = db_->KeyMayExist(options, key, value, value_found);
if (ret && value != nullptr && value_found != nullptr && *value_found) {
if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) {
@ -164,8 +163,8 @@ bool DBWithTTL::KeyMayExist(const ReadOptions& options,
}
Status DBWithTTL::Merge(const WriteOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key, const Slice& value) {
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
WriteBatch batch;
batch.Merge(key, value);
return Write(options, &batch);
@ -211,7 +210,7 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) {
}
Iterator* DBWithTTL::NewIterator(const ReadOptions& opts,
const ColumnFamilyHandle& column_family) {
ColumnFamilyHandle* column_family) {
return new TtlIterator(db_->NewIterator(opts, column_family));
}

@ -22,38 +22,37 @@ class DBWithTTL : public StackableDB {
using StackableDB::Put;
virtual Status Put(const WriteOptions& options,
const ColumnFamilyHandle& column_family, const Slice& key,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& val) override;
using StackableDB::Get;
virtual Status Get(const ReadOptions& options,
const ColumnFamilyHandle& column_family, const Slice& key,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) override;
using StackableDB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle>& column_family,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
using StackableDB::KeyMayExist;
virtual bool KeyMayExist(const ReadOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key, std::string* value,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value,
bool* value_found = nullptr) override;
using StackableDB::Merge;
virtual Status Merge(const WriteOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key, const Slice& value) override;
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
using StackableDB::NewIterator;
virtual Iterator* NewIterator(const ReadOptions& opts,
const ColumnFamilyHandle& column_family)
override;
ColumnFamilyHandle* column_family) override;
// Simulate a db crash, no elegant closing of database.
void TEST_Destroy_DBWithTtl();

Loading…
Cancel
Save