|
|
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
|
|
|
// This source code is licensed under the BSD-style license found in the
|
|
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
|
|
//
|
|
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
|
|
|
|
#include "db/memtable.h"
|
|
|
|
|
|
|
|
#include <memory>
|
|
|
|
|
|
|
|
#include "db/dbformat.h"
|
|
|
|
#include "db/merge_context.h"
|
|
|
|
#include "rocksdb/comparator.h"
|
|
|
|
#include "rocksdb/env.h"
|
|
|
|
#include "rocksdb/iterator.h"
|
|
|
|
#include "rocksdb/merge_operator.h"
|
|
|
|
#include "rocksdb/slice_transform.h"
|
|
|
|
#include "util/arena.h"
|
|
|
|
#include "util/coding.h"
|
|
|
|
#include "util/murmurhash.h"
|
|
|
|
#include "util/mutexlock.h"
|
|
|
|
#include "util/perf_context_imp.h"
|
|
|
|
#include "util/statistics.h"
|
|
|
|
#include "util/stop_watch.h"
|
|
|
|
|
|
|
|
namespace rocksdb {
|
|
|
|
|
|
|
|
MemTable::MemTable(const InternalKeyComparator& cmp,
|
|
|
|
const ColumnFamilyOptions& options)
|
|
|
|
: comparator_(cmp),
|
|
|
|
refs_(0),
|
|
|
|
arena_(options.arena_block_size),
|
|
|
|
table_(options.memtable_factory->CreateMemTableRep(
|
|
|
|
comparator_, &arena_, options.prefix_extractor.get())),
|
|
|
|
flush_in_progress_(false),
|
|
|
|
flush_completed_(false),
|
|
|
|
file_number_(0),
|
|
|
|
first_seqno_(0),
|
|
|
|
mem_next_logfile_number_(0),
|
|
|
|
locks_(options.inplace_update_support ? options.inplace_update_num_locks
|
|
|
|
: 0),
|
|
|
|
prefix_extractor_(options.prefix_extractor.get()) {
|
|
|
|
if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) {
|
|
|
|
prefix_bloom_.reset(new DynamicBloom(options.memtable_prefix_bloom_bits,
|
|
|
|
options.memtable_prefix_bloom_probes));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
MemTable::~MemTable() {
|
|
|
|
assert(refs_ == 0);
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t MemTable::ApproximateMemoryUsage() {
|
|
|
|
return arena_.ApproximateMemoryUsage() + table_->ApproximateMemoryUsage();
|
|
|
|
}
|
|
|
|
|
|
|
|
int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
|
|
|
|
const char* prefix_len_key2) const {
|
|
|
|
// Internal keys are encoded as length-prefixed strings.
|
|
|
|
Slice k1 = GetLengthPrefixedSlice(prefix_len_key1);
|
|
|
|
Slice k2 = GetLengthPrefixedSlice(prefix_len_key2);
|
|
|
|
return comparator.Compare(k1, k2);
|
|
|
|
}
|
|
|
|
|
|
|
|
int MemTable::KeyComparator::operator()(const char* prefix_len_key,
|
|
|
|
const Slice& key)
|
|
|
|
const {
|
|
|
|
// Internal keys are encoded as length-prefixed strings.
|
|
|
|
Slice a = GetLengthPrefixedSlice(prefix_len_key);
|
|
|
|
return comparator.Compare(a, key);
|
|
|
|
}
|
|
|
|
|
|
|
|
Slice MemTableRep::UserKey(const char* key) const {
|
|
|
|
Slice slice = GetLengthPrefixedSlice(key);
|
|
|
|
return Slice(slice.data(), slice.size() - 8);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Encode a suitable internal key target for "target" and return it.
|
|
|
|
// Uses *scratch as scratch space, and the returned pointer will point
|
|
|
|
// into this scratch space.
|
|
|
|
const char* EncodeKey(std::string* scratch, const Slice& target) {
|
|
|
|
scratch->clear();
|
|
|
|
PutVarint32(scratch, target.size());
|
|
|
|
scratch->append(target.data(), target.size());
|
|
|
|
return scratch->data();
|
|
|
|
}
|
|
|
|
|
|
|
|
class MemTableIterator: public Iterator {
|
|
|
|
public:
|
|
|
|
MemTableIterator(const MemTable& mem, const ReadOptions& options)
|
|
|
|
: mem_(mem), iter_(), dynamic_prefix_seek_(false), valid_(false) {
|
|
|
|
if (options.prefix) {
|
|
|
|
iter_.reset(mem_.table_->GetPrefixIterator(*options.prefix));
|
|
|
|
} else if (options.prefix_seek) {
|
|
|
|
dynamic_prefix_seek_ = true;
|
|
|
|
iter_.reset(mem_.table_->GetDynamicPrefixIterator());
|
|
|
|
} else {
|
|
|
|
iter_.reset(mem_.table_->GetIterator());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
virtual bool Valid() const { return valid_; }
|
|
|
|
virtual void Seek(const Slice& k) {
|
|
|
|
if (dynamic_prefix_seek_ && mem_.prefix_bloom_ &&
|
|
|
|
!mem_.prefix_bloom_->MayContain(
|
|
|
|
mem_.prefix_extractor_->Transform(ExtractUserKey(k)))) {
|
|
|
|
valid_ = false;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
iter_->Seek(k, nullptr);
|
|
|
|
valid_ = iter_->Valid();
|
|
|
|
}
|
|
|
|
virtual void SeekToFirst() {
|
|
|
|
iter_->SeekToFirst();
|
|
|
|
valid_ = iter_->Valid();
|
|
|
|
}
|
|
|
|
virtual void SeekToLast() {
|
|
|
|
iter_->SeekToLast();
|
|
|
|
valid_ = iter_->Valid();
|
|
|
|
}
|
|
|
|
virtual void Next() {
|
|
|
|
assert(Valid());
|
|
|
|
iter_->Next();
|
|
|
|
valid_ = iter_->Valid();
|
|
|
|
}
|
|
|
|
virtual void Prev() {
|
|
|
|
assert(Valid());
|
|
|
|
iter_->Prev();
|
|
|
|
valid_ = iter_->Valid();
|
|
|
|
}
|
|
|
|
virtual Slice key() const {
|
|
|
|
assert(Valid());
|
|
|
|
return GetLengthPrefixedSlice(iter_->key());
|
|
|
|
}
|
|
|
|
virtual Slice value() const {
|
|
|
|
assert(Valid());
|
|
|
|
Slice key_slice = GetLengthPrefixedSlice(iter_->key());
|
|
|
|
return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
|
|
|
|
}
|
|
|
|
|
|
|
|
virtual Status status() const { return Status::OK(); }
|
|
|
|
|
|
|
|
private:
|
|
|
|
const MemTable& mem_;
|
|
|
|
std::shared_ptr<MemTableRep::Iterator> iter_;
|
|
|
|
bool dynamic_prefix_seek_;
|
|
|
|
bool valid_;
|
|
|
|
|
|
|
|
// No copying allowed
|
|
|
|
MemTableIterator(const MemTableIterator&);
|
|
|
|
void operator=(const MemTableIterator&);
|
|
|
|
};
|
|
|
|
|
|
|
|
Iterator* MemTable::NewIterator(const ReadOptions& options) {
|
|
|
|
return new MemTableIterator(*this, options);
|
|
|
|
}
|
|
|
|
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
12 years ago
|
|
|
port::RWMutex* MemTable::GetLock(const Slice& key) {
|
|
|
|
static murmur_hash hash;
|
|
|
|
return &locks_[hash(key) % locks_.size()];
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
12 years ago
|
|
|
}
|
|
|
|
|
|
|
|
void MemTable::Add(SequenceNumber s, ValueType type,
|
|
|
|
const Slice& key, /* user key */
|
|
|
|
const Slice& value) {
|
|
|
|
// Format of an entry is concatenation of:
|
|
|
|
// key_size : varint32 of internal_key.size()
|
|
|
|
// key bytes : char[internal_key.size()]
|
|
|
|
// value_size : varint32 of value.size()
|
|
|
|
// value bytes : char[value.size()]
|
|
|
|
size_t key_size = key.size();
|
|
|
|
size_t val_size = value.size();
|
|
|
|
size_t internal_key_size = key_size + 8;
|
|
|
|
const size_t encoded_len =
|
|
|
|
VarintLength(internal_key_size) + internal_key_size +
|
|
|
|
VarintLength(val_size) + val_size;
|
|
|
|
char* buf = arena_.Allocate(encoded_len);
|
|
|
|
char* p = EncodeVarint32(buf, internal_key_size);
|
|
|
|
memcpy(p, key.data(), key_size);
|
|
|
|
p += key_size;
|
|
|
|
EncodeFixed64(p, (s << 8) | type);
|
|
|
|
p += 8;
|
|
|
|
p = EncodeVarint32(p, val_size);
|
|
|
|
memcpy(p, value.data(), val_size);
|
|
|
|
assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
|
|
|
|
table_->Insert(buf);
|
|
|
|
|
|
|
|
if (prefix_bloom_) {
|
|
|
|
assert(prefix_extractor_);
|
|
|
|
prefix_bloom_->Add(prefix_extractor_->Transform(key));
|
|
|
|
}
|
|
|
|
|
|
|
|
// The first sequence number inserted into the memtable
|
|
|
|
assert(first_seqno_ == 0 || s > first_seqno_);
|
|
|
|
if (first_seqno_ == 0) {
|
|
|
|
first_seqno_ = s;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Callback from MemTable::Get()
|
|
|
|
namespace {
|
|
|
|
|
|
|
|
struct Saver {
|
|
|
|
Status* status;
|
|
|
|
const LookupKey* key;
|
|
|
|
bool* found_final_value; // Is value set correctly? Used by KeyMayExist
|
|
|
|
bool* merge_in_progress;
|
|
|
|
std::string* value;
|
|
|
|
const MergeOperator* merge_operator;
|
|
|
|
// the merge operations encountered;
|
|
|
|
MergeContext* merge_context;
|
|
|
|
MemTable* mem;
|
|
|
|
Logger* logger;
|
|
|
|
Statistics* statistics;
|
|
|
|
bool inplace_update_support;
|
|
|
|
};
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
static bool SaveValue(void* arg, const char* entry) {
|
|
|
|
Saver* s = reinterpret_cast<Saver*>(arg);
|
|
|
|
MergeContext* merge_context = s->merge_context;
|
|
|
|
const MergeOperator* merge_operator = s->merge_operator;
|
|
|
|
|
|
|
|
assert(s != nullptr && merge_context != nullptr);
|
|
|
|
|
|
|
|
// entry format is:
|
|
|
|
// klength varint32
|
|
|
|
// userkey char[klength-8]
|
|
|
|
// tag uint64
|
|
|
|
// vlength varint32
|
|
|
|
// value char[vlength]
|
|
|
|
// Check that it belongs to same user key. We do not check the
|
|
|
|
// sequence number since the Seek() call above should have skipped
|
|
|
|
// all entries with overly large sequence numbers.
|
|
|
|
uint32_t key_length;
|
|
|
|
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
|
|
|
|
if (s->mem->GetInternalKeyComparator().user_comparator()->Compare(
|
|
|
|
Slice(key_ptr, key_length - 8), s->key->user_key()) == 0) {
|
|
|
|
// Correct user key
|
|
|
|
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
|
|
|
|
switch (static_cast<ValueType>(tag & 0xff)) {
|
|
|
|
case kTypeValue: {
|
|
|
|
if (s->inplace_update_support) {
|
|
|
|
s->mem->GetLock(s->key->user_key())->ReadLock();
|
|
|
|
}
|
|
|
|
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
|
|
|
|
*(s->status) = Status::OK();
|
|
|
|
if (*(s->merge_in_progress)) {
|
|
|
|
assert(merge_operator);
|
|
|
|
if (!merge_operator->FullMerge(s->key->user_key(), &v,
|
|
|
|
merge_context->GetOperands(), s->value,
|
|
|
|
s->logger)) {
|
|
|
|
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
|
|
|
|
*(s->status) =
|
|
|
|
Status::Corruption("Error: Could not perform merge.");
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
s->value->assign(v.data(), v.size());
|
|
|
|
}
|
|
|
|
if (s->inplace_update_support) {
|
|
|
|
s->mem->GetLock(s->key->user_key())->Unlock();
|
|
|
|
}
|
|
|
|
*(s->found_final_value) = true;
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
case kTypeDeletion: {
|
|
|
|
if (*(s->merge_in_progress)) {
|
|
|
|
assert(merge_operator);
|
|
|
|
*(s->status) = Status::OK();
|
|
|
|
if (!merge_operator->FullMerge(s->key->user_key(), nullptr,
|
|
|
|
merge_context->GetOperands(), s->value,
|
|
|
|
s->logger)) {
|
|
|
|
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
|
|
|
|
*(s->status) =
|
|
|
|
Status::Corruption("Error: Could not perform merge.");
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
*(s->status) = Status::NotFound();
|
|
|
|
}
|
|
|
|
*(s->found_final_value) = true;
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
case kTypeMerge: {
|
|
|
|
std::string merge_result; // temporary area for merge results later
|
|
|
|
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
|
|
|
|
*(s->merge_in_progress) = true;
|
|
|
|
merge_context->PushOperand(v);
|
|
|
|
while (merge_context->GetNumOperands() >= 2) {
|
|
|
|
// Attempt to associative merge. (Returns true if successful)
|
|
|
|
if (merge_operator->PartialMerge(
|
|
|
|
s->key->user_key(), merge_context->GetOperand(0),
|
|
|
|
merge_context->GetOperand(1), &merge_result, s->logger)) {
|
|
|
|
merge_context->PushPartialMergeResult(merge_result);
|
|
|
|
} else {
|
|
|
|
// Stack them because user can't associative merge
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
assert(false);
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// s->state could be Corrupt, merge or notfound
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
|
|
|
|
MergeContext& merge_context, const Options& options) {
|
|
|
|
StopWatchNano memtable_get_timer(options.env, false);
|
|
|
|
StartPerfTimer(&memtable_get_timer);
|
|
|
|
|
|
|
|
Slice user_key = key.user_key();
|
|
|
|
bool found_final_value = false;
|
|
|
|
bool merge_in_progress = s->IsMergeInProgress();
|
|
|
|
|
|
|
|
if (prefix_bloom_ &&
|
|
|
|
!prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key))) {
|
|
|
|
// iter is null if prefix bloom says the key does not exist
|
|
|
|
} else {
|
|
|
|
Saver saver;
|
|
|
|
saver.status = s;
|
|
|
|
saver.found_final_value = &found_final_value;
|
|
|
|
saver.merge_in_progress = &merge_in_progress;
|
|
|
|
saver.key = &key;
|
|
|
|
saver.value = value;
|
|
|
|
saver.status = s;
|
|
|
|
saver.mem = this;
|
|
|
|
saver.merge_context = &merge_context;
|
|
|
|
saver.merge_operator = options.merge_operator.get();
|
|
|
|
saver.logger = options.info_log.get();
|
|
|
|
saver.inplace_update_support = options.inplace_update_support;
|
|
|
|
saver.statistics = options.statistics.get();
|
|
|
|
table_->Get(key, &saver, SaveValue);
|
|
|
|
}
|
|
|
|
|
[RocksDB] [MergeOperator] The new Merge Interface! Uses merge sequences.
Summary:
Here are the major changes to the Merge Interface. It has been expanded
to handle cases where the MergeOperator is not associative. It does so by stacking
up merge operations while scanning through the key history (i.e.: during Get() or
Compaction), until a valid Put/Delete/end-of-history is encountered; it then
applies all of the merge operations in the correct sequence starting with the
base/sentinel value.
I have also introduced an "AssociativeMerge" function which allows the user to
take advantage of associative merge operations (such as in the case of counters).
The implementation will always attempt to merge the operations/operands themselves
together when they are encountered, and will resort to the "stacking" method if
and only if the "associative-merge" fails.
This implementation is conjectured to allow MergeOperator to handle the general
case, while still providing the user with the ability to take advantage of certain
efficiencies in their own merge-operator / data-structure.
NOTE: This is a preliminary diff. This must still go through a lot of review,
revision, and testing. Feedback welcome!
Test Plan:
-This is a preliminary diff. I have only just begun testing/debugging it.
-I will be testing this with the existing MergeOperator use-cases and unit-tests
(counters, string-append, and redis-lists)
-I will be "desk-checking" and walking through the code with the help gdb.
-I will find a way of stress-testing the new interface / implementation using
db_bench, db_test, merge_test, and/or db_stress.
-I will ensure that my tests cover all cases: Get-Memtable,
Get-Immutable-Memtable, Get-from-Disk, Iterator-Range-Scan, Flush-Memtable-to-L0,
Compaction-L0-L1, Compaction-Ln-L(n+1), Put/Delete found, Put/Delete not-found,
end-of-history, end-of-file, etc.
-A lot of feedback from the reviewers.
Reviewers: haobo, dhruba, zshao, emayanke
Reviewed By: haobo
CC: leveldb
Differential Revision: https://reviews.facebook.net/D11499
12 years ago
|
|
|
// No change to value, since we have not yet found a Put/Delete
|
|
|
|
if (!found_final_value && merge_in_progress) {
|
|
|
|
*s = Status::MergeInProgress("");
|
|
|
|
}
|
|
|
|
BumpPerfTime(&perf_context.get_from_memtable_time, &memtable_get_timer);
|
|
|
|
BumpPerfCount(&perf_context.get_from_memtable_count);
|
|
|
|
return found_final_value;
|
|
|
|
}
|
|
|
|
|
|
|
|
void MemTable::Update(SequenceNumber seq,
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
12 years ago
|
|
|
const Slice& key,
|
|
|
|
const Slice& value) {
|
|
|
|
LookupKey lkey(key, seq);
|
|
|
|
Slice mem_key = lkey.memtable_key();
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
12 years ago
|
|
|
|
|
|
|
std::unique_ptr<MemTableRep::Iterator> iter(
|
|
|
|
table_->GetIterator(lkey.user_key()));
|
|
|
|
iter->Seek(lkey.internal_key(), mem_key.data());
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
12 years ago
|
|
|
|
|
|
|
if (iter->Valid()) {
|
|
|
|
// entry format is:
|
|
|
|
// key_length varint32
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
12 years ago
|
|
|
// userkey char[klength-8]
|
|
|
|
// tag uint64
|
|
|
|
// vlength varint32
|
|
|
|
// value char[vlength]
|
|
|
|
// Check that it belongs to same user key. We do not check the
|
|
|
|
// sequence number since the Seek() call above should have skipped
|
|
|
|
// all entries with overly large sequence numbers.
|
|
|
|
const char* entry = iter->key();
|
|
|
|
uint32_t key_length = 0;
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
12 years ago
|
|
|
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
|
|
|
|
if (comparator_.comparator.user_comparator()->Compare(
|
|
|
|
Slice(key_ptr, key_length - 8), lkey.user_key()) == 0) {
|
|
|
|
// Correct user key
|
|
|
|
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
|
|
|
|
switch (static_cast<ValueType>(tag & 0xff)) {
|
|
|
|
case kTypeValue: {
|
|
|
|
Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
|
Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.
Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
2. Generate a new buffer indicating merged value. Returns 2.
3. Fails to do either of above, based on whatever application logic. Returns 0.
Test Plan: Just make all for now. I'm adding another unit test to test each scenario.
Reviewers: dhruba, haobo
Reviewed By: haobo
CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo
Differential Revision: https://reviews.facebook.net/D15195
11 years ago
|
|
|
uint32_t prev_size = prev_value.size();
|
|
|
|
uint32_t new_size = value.size();
|
|
|
|
|
Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.
Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
2. Generate a new buffer indicating merged value. Returns 2.
3. Fails to do either of above, based on whatever application logic. Returns 0.
Test Plan: Just make all for now. I'm adding another unit test to test each scenario.
Reviewers: dhruba, haobo
Reviewed By: haobo
CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo
Differential Revision: https://reviews.facebook.net/D15195
11 years ago
|
|
|
// Update value, if new value size <= previous value size
|
|
|
|
if (new_size <= prev_size ) {
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
12 years ago
|
|
|
char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
|
Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.
Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
2. Generate a new buffer indicating merged value. Returns 2.
3. Fails to do either of above, based on whatever application logic. Returns 0.
Test Plan: Just make all for now. I'm adding another unit test to test each scenario.
Reviewers: dhruba, haobo
Reviewed By: haobo
CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo
Differential Revision: https://reviews.facebook.net/D15195
11 years ago
|
|
|
new_size);
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
12 years ago
|
|
|
WriteLock wl(GetLock(lkey.user_key()));
|
|
|
|
memcpy(p, value.data(), value.size());
|
|
|
|
assert((unsigned)((p + value.size()) - entry) ==
|
|
|
|
(unsigned)(VarintLength(key_length) + key_length +
|
|
|
|
VarintLength(value.size()) + value.size()));
|
|
|
|
return;
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
12 years ago
|
|
|
}
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
// If the latest value is kTypeDeletion, kTypeMerge or kTypeLogData
|
|
|
|
// we don't have enough space for update inplace
|
|
|
|
Add(seq, kTypeValue, key, value);
|
|
|
|
return;
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
12 years ago
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// key doesn't exist
|
|
|
|
Add(seq, kTypeValue, key, value);
|
|
|
|
}
|
|
|
|
|
|
|
|
bool MemTable::UpdateCallback(SequenceNumber seq,
|
Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.
Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
2. Generate a new buffer indicating merged value. Returns 2.
3. Fails to do either of above, based on whatever application logic. Returns 0.
Test Plan: Just make all for now. I'm adding another unit test to test each scenario.
Reviewers: dhruba, haobo
Reviewed By: haobo
CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo
Differential Revision: https://reviews.facebook.net/D15195
11 years ago
|
|
|
const Slice& key,
|
|
|
|
const Slice& delta,
|
|
|
|
const Options& options) {
|
|
|
|
LookupKey lkey(key, seq);
|
|
|
|
Slice memkey = lkey.memtable_key();
|
|
|
|
|
|
|
|
std::shared_ptr<MemTableRep::Iterator> iter(
|
|
|
|
table_->GetIterator(lkey.user_key()));
|
|
|
|
iter->Seek(lkey.internal_key(), memkey.data());
|
|
|
|
|
|
|
|
if (iter->Valid()) {
|
|
|
|
// entry format is:
|
|
|
|
// key_length varint32
|
|
|
|
// userkey char[klength-8]
|
|
|
|
// tag uint64
|
|
|
|
// vlength varint32
|
|
|
|
// value char[vlength]
|
|
|
|
// Check that it belongs to same user key. We do not check the
|
|
|
|
// sequence number since the Seek() call above should have skipped
|
|
|
|
// all entries with overly large sequence numbers.
|
|
|
|
const char* entry = iter->key();
|
|
|
|
uint32_t key_length = 0;
|
|
|
|
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
|
|
|
|
if (comparator_.comparator.user_comparator()->Compare(
|
|
|
|
Slice(key_ptr, key_length - 8), lkey.user_key()) == 0) {
|
|
|
|
// Correct user key
|
|
|
|
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
|
|
|
|
switch (static_cast<ValueType>(tag & 0xff)) {
|
|
|
|
case kTypeValue: {
|
|
|
|
Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
|
Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.
Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
2. Generate a new buffer indicating merged value. Returns 2.
3. Fails to do either of above, based on whatever application logic. Returns 0.
Test Plan: Just make all for now. I'm adding another unit test to test each scenario.
Reviewers: dhruba, haobo
Reviewed By: haobo
CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo
Differential Revision: https://reviews.facebook.net/D15195
11 years ago
|
|
|
uint32_t prev_size = prev_value.size();
|
|
|
|
|
|
|
|
char* prev_buffer = const_cast<char*>(prev_value.data());
|
|
|
|
uint32_t new_prev_size = prev_size;
|
|
|
|
|
|
|
|
std::string str_value;
|
Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.
Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
2. Generate a new buffer indicating merged value. Returns 2.
3. Fails to do either of above, based on whatever application logic. Returns 0.
Test Plan: Just make all for now. I'm adding another unit test to test each scenario.
Reviewers: dhruba, haobo
Reviewed By: haobo
CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo
Differential Revision: https://reviews.facebook.net/D15195
11 years ago
|
|
|
WriteLock wl(GetLock(lkey.user_key()));
|
|
|
|
auto status = options.inplace_callback(prev_buffer, &new_prev_size,
|
|
|
|
delta, &str_value);
|
|
|
|
if (status == UpdateStatus::UPDATED_INPLACE) {
|
|
|
|
// Value already updated by callback.
|
Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.
Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
2. Generate a new buffer indicating merged value. Returns 2.
3. Fails to do either of above, based on whatever application logic. Returns 0.
Test Plan: Just make all for now. I'm adding another unit test to test each scenario.
Reviewers: dhruba, haobo
Reviewed By: haobo
CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo
Differential Revision: https://reviews.facebook.net/D15195
11 years ago
|
|
|
assert(new_prev_size <= prev_size);
|
|
|
|
if (new_prev_size < prev_size) {
|
|
|
|
// overwrite the new prev_size
|
|
|
|
char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
|
|
|
|
new_prev_size);
|
|
|
|
if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
|
|
|
|
// shift the value buffer as well.
|
|
|
|
memcpy(p, prev_buffer, new_prev_size);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
RecordTick(options.statistics.get(), NUMBER_KEYS_UPDATED);
|
|
|
|
return true;
|
Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.
Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
2. Generate a new buffer indicating merged value. Returns 2.
3. Fails to do either of above, based on whatever application logic. Returns 0.
Test Plan: Just make all for now. I'm adding another unit test to test each scenario.
Reviewers: dhruba, haobo
Reviewed By: haobo
CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo
Differential Revision: https://reviews.facebook.net/D15195
11 years ago
|
|
|
} else if (status == UpdateStatus::UPDATED) {
|
|
|
|
Add(seq, kTypeValue, key, Slice(str_value));
|
|
|
|
RecordTick(options.statistics.get(), NUMBER_KEYS_WRITTEN);
|
|
|
|
return true;
|
Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.
Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
2. Generate a new buffer indicating merged value. Returns 2.
3. Fails to do either of above, based on whatever application logic. Returns 0.
Test Plan: Just make all for now. I'm adding another unit test to test each scenario.
Reviewers: dhruba, haobo
Reviewed By: haobo
CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo
Differential Revision: https://reviews.facebook.net/D15195
11 years ago
|
|
|
} else if (status == UpdateStatus::UPDATE_FAILED) {
|
|
|
|
// No action required. Return.
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// If the latest value is not kTypeValue
|
|
|
|
// or key doesn't exist
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
12 years ago
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
|
|
|
|
Slice memkey = key.memtable_key();
|
|
|
|
|
|
|
|
// A total ordered iterator is costly for some memtablerep (prefix aware
|
|
|
|
// reps). By passing in the user key, we allow efficient iterator creation.
|
|
|
|
// The iterator only needs to be ordered within the same user key.
|
|
|
|
std::unique_ptr<MemTableRep::Iterator> iter(
|
|
|
|
table_->GetIterator(key.user_key()));
|
|
|
|
iter->Seek(key.internal_key(), memkey.data());
|
|
|
|
|
|
|
|
size_t num_successive_merges = 0;
|
|
|
|
|
|
|
|
for (; iter->Valid(); iter->Next()) {
|
|
|
|
const char* entry = iter->key();
|
|
|
|
uint32_t key_length = 0;
|
|
|
|
const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
|
|
|
|
if (!comparator_.comparator.user_comparator()->Compare(
|
|
|
|
Slice(iter_key_ptr, key_length - 8), key.user_key()) == 0) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8);
|
|
|
|
if (static_cast<ValueType>(tag & 0xff) != kTypeMerge) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
++num_successive_merges;
|
|
|
|
}
|
|
|
|
|
|
|
|
return num_successive_merges;
|
|
|
|
}
|
|
|
|
|
|
|
|
void MemTableRep::Get(const LookupKey& k, void* callback_args,
|
|
|
|
bool (*callback_func)(void* arg, const char* entry)) {
|
|
|
|
auto iter = GetIterator(k.user_key());
|
|
|
|
for (iter->Seek(k.internal_key(), k.memtable_key().data());
|
|
|
|
iter->Valid() && callback_func(callback_args, iter->key());
|
|
|
|
iter->Next()) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
} // namespace rocksdb
|