MemTableOptions

Summary: removed reference to options in WriteBatch and DBImpl::Get()

Test Plan: make all check

Reviewers: yhchiang, igor, sdong

Reviewed By: sdong

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D23049
main
Lei Jin 10 years ago
parent 55114e7f40
commit 52311463e9
  1. 3
      db/column_family.cc
  2. 15
      db/db_impl.cc
  3. 11
      db/db_impl_readonly.cc
  4. 87
      db/memtable.cc
  5. 34
      db/memtable.h
  6. 5
      db/memtable_list.cc
  7. 2
      db/memtable_list.h
  8. 2
      db/repair.cc
  9. 42
      db/write_batch.cc
  10. 3
      db/write_batch_test.cc
  11. 2
      include/rocksdb/immutable_options.h
  12. 17
      table/table_test.cc
  13. 1
      util/options.cc

@ -383,7 +383,8 @@ void ColumnFamilyData::CreateNewMemtable() {
if (mem_ != nullptr) {
delete mem_->Unref();
}
mem_ = new MemTable(internal_comparator_, options_);
mem_ = new MemTable(internal_comparator_, ioptions_,
MemTableOptions(options_));
mem_->Ref();
}

@ -3434,10 +3434,10 @@ Status DBImpl::GetImpl(const ReadOptions& options,
LookupKey lkey(key, snapshot);
PERF_TIMER_STOP(get_snapshot_time);
if (sv->mem->Get(lkey, value, &s, merge_context, *cfd->options())) {
if (sv->mem->Get(lkey, value, &s, merge_context)) {
// Done
RecordTick(stats_, MEMTABLE_HIT);
} else if (sv->imm->Get(lkey, value, &s, merge_context, *cfd->options())) {
} else if (sv->imm->Get(lkey, value, &s, merge_context)) {
// Done
RecordTick(stats_, MEMTABLE_HIT);
} else {
@ -3522,12 +3522,9 @@ std::vector<Status> DBImpl::MultiGet(
assert(mgd_iter != multiget_cf_data.end());
auto mgd = mgd_iter->second;
auto super_version = mgd->super_version;
auto cfd = mgd->cfd;
if (super_version->mem->Get(lkey, value, &s, merge_context,
*cfd->options())) {
if (super_version->mem->Get(lkey, value, &s, merge_context)) {
// Done
} else if (super_version->imm->Get(lkey, value, &s, merge_context,
*cfd->options())) {
} else if (super_version->imm->Get(lkey, value, &s, merge_context)) {
// Done
} else {
super_version->current->Get(options, lkey, value, &s, &merge_context);
@ -4294,7 +4291,9 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
}
if (s.ok()) {
new_mem = new MemTable(cfd->internal_comparator(), *cfd->options());
new_mem = new MemTable(cfd->internal_comparator(),
*cfd->ioptions(),
MemTableOptions(*cfd->options()));
new_superversion = new SuperVersion();
}
}

@ -41,9 +41,9 @@
namespace rocksdb {
DBImplReadOnly::DBImplReadOnly(const DBOptions& options,
DBImplReadOnly::DBImplReadOnly(const DBOptions& db_options,
const std::string& dbname)
: DBImpl(options, dbname) {
: DBImpl(db_options, dbname) {
Log(db_options_.info_log, "Opening the db in read only mode");
}
@ -51,7 +51,7 @@ DBImplReadOnly::~DBImplReadOnly() {
}
// Implementations of the DB interface
Status DBImplReadOnly::Get(const ReadOptions& options,
Status DBImplReadOnly::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) {
Status s;
@ -61,10 +61,9 @@ Status DBImplReadOnly::Get(const ReadOptions& options,
SuperVersion* super_version = cfd->GetSuperVersion();
MergeContext merge_context;
LookupKey lkey(key, snapshot);
if (super_version->mem->Get(lkey, value, &s, merge_context,
*cfd->options())) {
if (super_version->mem->Get(lkey, value, &s, merge_context)) {
} else {
super_version->current->Get(options, lkey, value, &s, &merge_context);
super_version->current->Get(read_options, lkey, value, &s, &merge_context);
}
return s;
}

@ -31,35 +31,51 @@
namespace rocksdb {
MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options)
MemTableOptions::MemTableOptions(const Options& options)
: write_buffer_size(options.write_buffer_size),
arena_block_size(options.arena_block_size),
memtable_prefix_bloom_bits(options.memtable_prefix_bloom_bits),
memtable_prefix_bloom_probes(options.memtable_prefix_bloom_probes),
memtable_prefix_bloom_huge_page_tlb_size(
options.memtable_prefix_bloom_huge_page_tlb_size),
inplace_update_support(options.inplace_update_support),
inplace_update_num_locks(options.inplace_update_num_locks),
inplace_callback(options.inplace_callback),
max_successive_merges(options.max_successive_merges),
filter_deletes(options.filter_deletes) {}
MemTable::MemTable(const InternalKeyComparator& cmp,
const ImmutableCFOptions& ioptions,
const MemTableOptions& moptions)
: comparator_(cmp),
ioptions_(ioptions),
moptions_(moptions),
refs_(0),
kArenaBlockSize(OptimizeBlockSize(options.arena_block_size)),
kWriteBufferSize(options.write_buffer_size),
arena_(options.arena_block_size),
table_(options.memtable_factory->CreateMemTableRep(
comparator_, &arena_, options.prefix_extractor.get(),
options.info_log.get())),
kArenaBlockSize(OptimizeBlockSize(moptions.arena_block_size)),
arena_(moptions.arena_block_size),
table_(ioptions.memtable_factory->CreateMemTableRep(
comparator_, &arena_, ioptions.prefix_extractor,
ioptions.info_log)),
num_entries_(0),
flush_in_progress_(false),
flush_completed_(false),
file_number_(0),
first_seqno_(0),
mem_next_logfile_number_(0),
locks_(options.inplace_update_support ? options.inplace_update_num_locks
: 0),
prefix_extractor_(options.prefix_extractor.get()),
locks_(moptions.inplace_update_support ? moptions.inplace_update_num_locks
: 0),
prefix_extractor_(ioptions.prefix_extractor),
should_flush_(ShouldFlushNow()) {
// if should_flush_ == true without an entry inserted, something must have
// gone wrong already.
assert(!should_flush_);
if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) {
if (prefix_extractor_ && moptions.memtable_prefix_bloom_bits > 0) {
prefix_bloom_.reset(new DynamicBloom(
&arena_,
options.memtable_prefix_bloom_bits, options.bloom_locality,
options.memtable_prefix_bloom_probes, nullptr,
options.memtable_prefix_bloom_huge_page_tlb_size,
options.info_log.get()));
moptions.memtable_prefix_bloom_bits, ioptions.bloom_locality,
moptions.memtable_prefix_bloom_probes, nullptr,
moptions.memtable_prefix_bloom_huge_page_tlb_size,
ioptions.info_log));
}
}
@ -97,14 +113,16 @@ bool MemTable::ShouldFlushNow() const {
// if we can still allocate one more block without exceeding the
// over-allocation ratio, then we should not flush.
if (allocated_memory + kArenaBlockSize <
kWriteBufferSize + kArenaBlockSize * kAllowOverAllocationRatio) {
moptions_.write_buffer_size +
kArenaBlockSize * kAllowOverAllocationRatio) {
return false;
}
// if user keeps adding entries that exceeds kWriteBufferSize, we need to
// flush earlier even though we still have much available memory left.
if (allocated_memory >
kWriteBufferSize + kArenaBlockSize * kAllowOverAllocationRatio) {
// if user keeps adding entries that exceeds moptions.write_buffer_size,
// we need to flush earlier even though we still have much available
// memory left.
if (allocated_memory > moptions_.write_buffer_size +
kArenaBlockSize * kAllowOverAllocationRatio) {
return true;
}
@ -175,12 +193,12 @@ const char* EncodeKey(std::string* scratch, const Slice& target) {
class MemTableIterator: public Iterator {
public:
MemTableIterator(
const MemTable& mem, const ReadOptions& options, Arena* arena)
const MemTable& mem, const ReadOptions& read_options, Arena* arena)
: bloom_(nullptr),
prefix_extractor_(mem.prefix_extractor_),
valid_(false),
arena_mode_(arena != nullptr) {
if (prefix_extractor_ != nullptr && !options.total_order_seek) {
if (prefix_extractor_ != nullptr && !read_options.total_order_seek) {
bloom_ = mem.prefix_bloom_.get();
iter_ = mem.table_->GetDynamicPrefixIterator(arena);
} else {
@ -248,10 +266,10 @@ class MemTableIterator: public Iterator {
void operator=(const MemTableIterator&);
};
Iterator* MemTable::NewIterator(const ReadOptions& options, Arena* arena) {
Iterator* MemTable::NewIterator(const ReadOptions& read_options, Arena* arena) {
assert(arena != nullptr);
auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
return new (mem) MemTableIterator(*this, options, arena);
return new (mem) MemTableIterator(*this, read_options, arena);
}
port::RWMutex* MemTable::GetLock(const Slice& key) {
@ -412,7 +430,7 @@ static bool SaveValue(void* arg, const char* entry) {
}
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext& merge_context, const Options& options) {
MergeContext& merge_context) {
// The sequence number is updated synchronously in version_set.h
if (IsEmpty()) {
// Avoiding recording stats for speed.
@ -437,10 +455,10 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
saver.status = s;
saver.mem = this;
saver.merge_context = &merge_context;
saver.merge_operator = options.merge_operator.get();
saver.logger = options.info_log.get();
saver.inplace_update_support = options.inplace_update_support;
saver.statistics = options.statistics.get();
saver.merge_operator = ioptions_.merge_operator;
saver.logger = ioptions_.info_log;
saver.inplace_update_support = moptions_.inplace_update_support;
saver.statistics = ioptions_.statistics;
table_->Get(key, &saver, SaveValue);
}
@ -512,8 +530,7 @@ void MemTable::Update(SequenceNumber seq,
bool MemTable::UpdateCallback(SequenceNumber seq,
const Slice& key,
const Slice& delta,
const Options& options) {
const Slice& delta) {
LookupKey lkey(key, seq);
Slice memkey = lkey.memtable_key();
@ -548,8 +565,8 @@ bool MemTable::UpdateCallback(SequenceNumber seq,
std::string str_value;
WriteLock wl(GetLock(lkey.user_key()));
auto status = options.inplace_callback(prev_buffer, &new_prev_size,
delta, &str_value);
auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size,
delta, &str_value);
if (status == UpdateStatus::UPDATED_INPLACE) {
// Value already updated by callback.
assert(new_prev_size <= prev_size);
@ -562,12 +579,12 @@ bool MemTable::UpdateCallback(SequenceNumber seq,
memcpy(p, prev_buffer, new_prev_size);
}
}
RecordTick(options.statistics.get(), NUMBER_KEYS_UPDATED);
RecordTick(ioptions_.statistics, NUMBER_KEYS_UPDATED);
should_flush_ = ShouldFlushNow();
return true;
} else if (status == UpdateStatus::UPDATED) {
Add(seq, kTypeValue, key, Slice(str_value));
RecordTick(options.statistics.get(), NUMBER_KEYS_WRITTEN);
RecordTick(ioptions_.statistics, NUMBER_KEYS_WRITTEN);
should_flush_ = ShouldFlushNow();
return true;
} else if (status == UpdateStatus::UPDATE_FAILED) {

@ -16,6 +16,7 @@
#include "db/version_edit.h"
#include "rocksdb/db.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/immutable_options.h"
#include "util/arena.h"
#include "util/dynamic_bloom.h"
@ -26,6 +27,23 @@ class Mutex;
class MemTableIterator;
class MergeContext;
struct MemTableOptions {
explicit MemTableOptions(const Options& options);
size_t write_buffer_size;
size_t arena_block_size;
uint32_t memtable_prefix_bloom_bits;
uint32_t memtable_prefix_bloom_probes;
size_t memtable_prefix_bloom_huge_page_tlb_size;
bool inplace_update_support;
size_t inplace_update_num_locks;
UpdateStatus (*inplace_callback)(char* existing_value,
uint32_t* existing_value_size,
Slice delta_value,
std::string* merged_value);
size_t max_successive_merges;
bool filter_deletes;
};
class MemTable {
public:
struct KeyComparator : public MemTableRep::KeyComparator {
@ -40,7 +58,8 @@ class MemTable {
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator,
const Options& options);
const ImmutableCFOptions& ioptions,
const MemTableOptions& moptions);
~MemTable();
@ -81,7 +100,7 @@ class MemTable {
// arena: If not null, the arena needs to be used to allocate the Iterator.
// Calling ~Iterator of the iterator will destroy all the states but
// those allocated in arena.
Iterator* NewIterator(const ReadOptions& options, Arena* arena);
Iterator* NewIterator(const ReadOptions& read_options, Arena* arena);
// Add an entry into memtable that maps key to value at the
// specified sequence number and with the specified type.
@ -99,7 +118,7 @@ class MemTable {
// store MergeInProgress in s, and return false.
// Else, return false.
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext& merge_context, const Options& options);
MergeContext& merge_context);
// Attempts to update the new_value inplace, else does normal Add
// Pseudocode
@ -123,8 +142,7 @@ class MemTable {
// else return false
bool UpdateCallback(SequenceNumber seq,
const Slice& key,
const Slice& delta,
const Options& options);
const Slice& delta);
// Returns the number of successive merge entries starting from the newest
// entry for the key up to the last non-merge entry or last entry for the
@ -172,6 +190,9 @@ class MemTable {
const Arena& TEST_GetArena() const { return arena_; }
const ImmutableCFOptions* GetImmutableOptions() const { return &ioptions_; }
const MemTableOptions* GetMemTableOptions() const { return &moptions_; }
private:
// Dynamically check if we can add more incoming entries.
bool ShouldFlushNow() const;
@ -181,9 +202,10 @@ class MemTable {
friend class MemTableList;
KeyComparator comparator_;
const ImmutableCFOptions& ioptions_;
const MemTableOptions moptions_;
int refs_;
const size_t kArenaBlockSize;
const size_t kWriteBufferSize;
Arena arena_;
unique_ptr<MemTableRep> table_;

@ -62,10 +62,9 @@ int MemTableList::size() const {
// Return the most recent value found, if any.
// Operands stores the list of merge operations to apply, so far.
bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
Status* s, MergeContext& merge_context,
const Options& options) {
Status* s, MergeContext& merge_context) {
for (auto& memtable : memlist_) {
if (memtable->Get(key, value, s, merge_context, options)) {
if (memtable->Get(key, value, s, merge_context)) {
return true;
}
}

@ -46,7 +46,7 @@ class MemTableListVersion {
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext& merge_context, const Options& options);
MergeContext& merge_context);
void AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iterator_list, Arena* arena);

@ -219,7 +219,7 @@ class Repairer {
std::string scratch;
Slice record;
WriteBatch batch;
MemTable* mem = new MemTable(icmp_, options_);
MemTable* mem = new MemTable(icmp_, ioptions_, MemTableOptions(options_));
auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem, &options_);
mem->Ref();
int counter = 0;

@ -23,7 +23,6 @@
// data: uint8[len]
#include "rocksdb/write_batch.h"
#include "rocksdb/options.h"
#include "rocksdb/merge_operator.h"
#include "db/dbformat.h"
#include "db/db_impl.h"
@ -350,14 +349,15 @@ class MemTableInserter : public WriteBatch::Handler {
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable();
const Options* options = cf_mems_->GetOptions();
if (!options->inplace_update_support) {
auto* ioptions = mem->GetImmutableOptions();
auto* moptions = mem->GetMemTableOptions();
if (!moptions->inplace_update_support) {
mem->Add(sequence_, kTypeValue, key, value);
} else if (options->inplace_callback == nullptr) {
} else if (moptions->inplace_callback == nullptr) {
mem->Update(sequence_, key, value);
RecordTick(options->statistics.get(), NUMBER_KEYS_UPDATED);
RecordTick(ioptions->statistics, NUMBER_KEYS_UPDATED);
} else {
if (mem->UpdateCallback(sequence_, key, value, *options)) {
if (mem->UpdateCallback(sequence_, key, value)) {
} else {
// key not found in memtable. Do sst get, update, add
SnapshotImpl read_from_snapshot;
@ -376,17 +376,17 @@ class MemTableInserter : public WriteBatch::Handler {
char* prev_buffer = const_cast<char*>(prev_value.c_str());
uint32_t prev_size = prev_value.size();
auto status = options->inplace_callback(s.ok() ? prev_buffer : nullptr,
s.ok() ? &prev_size : nullptr,
value, &merged_value);
auto status = moptions->inplace_callback(s.ok() ? prev_buffer : nullptr,
s.ok() ? &prev_size : nullptr,
value, &merged_value);
if (status == UpdateStatus::UPDATED_INPLACE) {
// prev_value is updated in-place with final value.
mem->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size));
RecordTick(options->statistics.get(), NUMBER_KEYS_WRITTEN);
RecordTick(ioptions->statistics, NUMBER_KEYS_WRITTEN);
} else if (status == UpdateStatus::UPDATED) {
// merged_value contains the final value.
mem->Add(sequence_, kTypeValue, key, Slice(merged_value));
RecordTick(options->statistics.get(), NUMBER_KEYS_WRITTEN);
RecordTick(ioptions->statistics, NUMBER_KEYS_WRITTEN);
}
}
}
@ -405,17 +405,18 @@ class MemTableInserter : public WriteBatch::Handler {
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable();
const Options* options = cf_mems_->GetOptions();
auto* ioptions = mem->GetImmutableOptions();
auto* moptions = mem->GetMemTableOptions();
bool perform_merge = false;
if (options->max_successive_merges > 0 && db_ != nullptr) {
if (moptions->max_successive_merges > 0 && db_ != nullptr) {
LookupKey lkey(key, sequence_);
// Count the number of successive merges at the head
// of the key in the memtable
size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
if (num_merges >= options->max_successive_merges) {
if (num_merges >= moptions->max_successive_merges) {
perform_merge = true;
}
}
@ -439,16 +440,16 @@ class MemTableInserter : public WriteBatch::Handler {
Slice get_value_slice = Slice(get_value);
// 2) Apply this merge
auto merge_operator = options->merge_operator.get();
auto merge_operator = ioptions->merge_operator;
assert(merge_operator);
std::deque<std::string> operands;
operands.push_front(value.ToString());
std::string new_value;
if (!merge_operator->FullMerge(key, &get_value_slice, operands,
&new_value, options->info_log.get())) {
&new_value, ioptions->info_log)) {
// Failed to merge!
RecordTick(options->statistics.get(), NUMBER_MERGE_FAILURES);
RecordTick(ioptions->statistics, NUMBER_MERGE_FAILURES);
// Store the delta in memtable
perform_merge = false;
@ -474,8 +475,9 @@ class MemTableInserter : public WriteBatch::Handler {
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable();
const Options* options = cf_mems_->GetOptions();
if (!dont_filter_deletes_ && options->filter_deletes) {
auto* ioptions = mem->GetImmutableOptions();
auto* moptions = mem->GetMemTableOptions();
if (!dont_filter_deletes_ && moptions->filter_deletes) {
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
ReadOptions ropts;
@ -486,7 +488,7 @@ class MemTableInserter : public WriteBatch::Handler {
cf_handle = db_->DefaultColumnFamily();
}
if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) {
RecordTick(options->statistics.get(), NUMBER_FILTERED_DELETES);
RecordTick(ioptions->statistics, NUMBER_FILTERED_DELETES);
return Status::OK();
}
}

@ -27,7 +27,8 @@ static std::string PrintContents(WriteBatch* b) {
auto factory = std::make_shared<SkipListFactory>();
Options options;
options.memtable_factory = factory;
MemTable* mem = new MemTable(cmp, options);
MemTable* mem = new MemTable(cmp, ImmutableCFOptions(options),
MemTableOptions(options));
mem->Ref();
std::string state;
ColumnFamilyMemTablesDefault cf_mems_default(mem, &options);

@ -51,6 +51,8 @@ struct ImmutableCFOptions {
std::vector<DbPath> db_paths;
MemTableRepFactory* memtable_factory;
TableFactory* table_factory;
Options::TablePropertiesCollectorFactories

@ -437,21 +437,25 @@ class MemTableConstructor: public Constructor {
table_factory_(new SkipListFactory) {
Options options;
options.memtable_factory = table_factory_;
memtable_ = new MemTable(internal_comparator_, options);
memtable_ = new MemTable(internal_comparator_,
ImmutableCFOptions(options),
MemTableOptions(options));
memtable_->Ref();
}
~MemTableConstructor() {
delete memtable_->Unref();
}
virtual Status FinishImpl(const Options& options,
virtual Status FinishImpl(const Options&,
const ImmutableCFOptions& ioptions,
const BlockBasedTableOptions& table_options,
const InternalKeyComparator& internal_comparator,
const KVMap& data) {
delete memtable_->Unref();
Options memtable_options;
memtable_options.memtable_factory = table_factory_;
memtable_ = new MemTable(internal_comparator_, memtable_options);
Options options;
options.memtable_factory = table_factory_;
memtable_ = new MemTable(internal_comparator_,
ImmutableCFOptions(options),
MemTableOptions(options));
memtable_->Ref();
int seq = 1;
for (KVMap::const_iterator it = data.begin();
@ -1859,7 +1863,8 @@ TEST(MemTableTest, Simple) {
auto table_factory = std::make_shared<SkipListFactory>();
Options options;
options.memtable_factory = table_factory;
MemTable* memtable = new MemTable(cmp, options);
MemTable* memtable = new MemTable(cmp, ImmutableCFOptions(options),
MemTableOptions(options));
memtable->Ref();
WriteBatch batch;
WriteBatchInternal::SetSequence(&batch, 100);

@ -47,6 +47,7 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options)
allow_mmap_reads(options.allow_mmap_reads),
allow_mmap_writes(options.allow_mmap_writes),
db_paths(options.db_paths),
memtable_factory(options.memtable_factory.get()),
table_factory(options.table_factory.get()),
table_properties_collector_factories(
options.table_properties_collector_factories),

Loading…
Cancel
Save