// Copyright (c) 2013, Facebook, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/memtable.h" #include #include "db/dbformat.h" #include "rocksdb/comparator.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/merge_operator.h" #include "util/coding.h" #include "util/mutexlock.h" #include "util/murmurhash.h" #include "util/perf_context_imp.h" #include "util/stop_watch.h" namespace std { template <> struct hash { size_t operator()(const rocksdb::Slice& slice) const { return MurmurHash(slice.data(), slice.size(), 0); } }; } namespace rocksdb { MemTable::MemTable(const InternalKeyComparator& cmp, std::shared_ptr table_factory, int numlevel, const Options& options) : comparator_(cmp), refs_(0), arena_impl_(options.arena_block_size), table_(table_factory->CreateMemTableRep(comparator_, &arena_impl_)), flush_in_progress_(false), flush_completed_(false), file_number_(0), edit_(numlevel), first_seqno_(0), mem_next_logfile_number_(0), mem_logfile_number_(0), locks_(options.inplace_update_support ? options.inplace_update_num_locks : 0) { } MemTable::~MemTable() { assert(refs_ == 0); } size_t MemTable::ApproximateMemoryUsage() { return arena_impl_.ApproximateMemoryUsage() + table_->ApproximateMemoryUsage(); } int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr) const { // Internal keys are encoded as length-prefixed strings. Slice a = GetLengthPrefixedSlice(aptr); Slice b = GetLengthPrefixedSlice(bptr); return comparator.Compare(a, b); } Slice MemTableRep::UserKey(const char* key) const { Slice slice = GetLengthPrefixedSlice(key); return Slice(slice.data(), slice.size() - 8); } // Encode a suitable internal key target for "target" and return it. // Uses *scratch as scratch space, and the returned pointer will point // into this scratch space. const char* EncodeKey(std::string* scratch, const Slice& target) { scratch->clear(); PutVarint32(scratch, target.size()); scratch->append(target.data(), target.size()); return scratch->data(); } class MemTableIterator: public Iterator { public: MemTableIterator(MemTableRep* table, const ReadOptions& options) : iter_() { if (options.prefix) { iter_ = table->GetPrefixIterator(*options.prefix); } else if (options.prefix_seek) { iter_ = table->GetDynamicPrefixIterator(); } else { iter_ = table->GetIterator(); } } virtual bool Valid() const { return iter_->Valid(); } virtual void Seek(const Slice& k) { iter_->Seek(k, nullptr); } virtual void SeekToFirst() { iter_->SeekToFirst(); } virtual void SeekToLast() { iter_->SeekToLast(); } virtual void Next() { iter_->Next(); } virtual void Prev() { iter_->Prev(); } virtual Slice key() const { return GetLengthPrefixedSlice(iter_->key()); } virtual Slice value() const { Slice key_slice = GetLengthPrefixedSlice(iter_->key()); return GetLengthPrefixedSlice(key_slice.data() + key_slice.size()); } virtual Status status() const { return Status::OK(); } private: std::shared_ptr iter_; // No copying allowed MemTableIterator(const MemTableIterator&); void operator=(const MemTableIterator&); }; Iterator* MemTable::NewIterator(const ReadOptions& options) { return new MemTableIterator(table_.get(), options); } port::RWMutex* MemTable::GetLock(const Slice& key) { return &locks_[std::hash()(key) % locks_.size()]; } void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, const Slice& value) { // Format of an entry is concatenation of: // key_size : varint32 of internal_key.size() // key bytes : char[internal_key.size()] // value_size : varint32 of value.size() // value bytes : char[value.size()] size_t key_size = key.size(); size_t val_size = value.size(); size_t internal_key_size = key_size + 8; const size_t encoded_len = VarintLength(internal_key_size) + internal_key_size + VarintLength(val_size) + val_size; char* buf = arena_impl_.Allocate(encoded_len); char* p = EncodeVarint32(buf, internal_key_size); memcpy(p, key.data(), key_size); p += key_size; EncodeFixed64(p, (s << 8) | type); p += 8; p = EncodeVarint32(p, val_size); memcpy(p, value.data(), val_size); assert((p + val_size) - buf == (unsigned)encoded_len); table_->Insert(buf); // The first sequence number inserted into the memtable assert(first_seqno_ == 0 || s > first_seqno_); if (first_seqno_ == 0) { first_seqno_ = s; } } bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, std::deque* operands, const Options& options) { StopWatchNano memtable_get_timer(options.env, false); StartPerfTimer(&memtable_get_timer); Slice memkey = key.memtable_key(); std::shared_ptr iter( table_->GetIterator(key.user_key())); iter->Seek(key.user_key(), memkey.data()); // It is the caller's responsibility to allocate/delete operands list assert(operands != nullptr); bool merge_in_progress = s->IsMergeInProgress(); auto merge_operator = options.merge_operator.get(); auto logger = options.info_log; std::string merge_result; bool found_final_value = false; for (; !found_final_value && iter->Valid(); iter->Next()) { // entry format is: // klength varint32 // userkey char[klength-8] // tag uint64 // vlength varint32 // value char[vlength] // Check that it belongs to same user key. We do not check the // sequence number since the Seek() call above should have skipped // all entries with overly large sequence numbers. const char* entry = iter->key(); uint32_t key_length; const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); if (comparator_.comparator.user_comparator()->Compare( Slice(key_ptr, key_length - 8), key.user_key()) == 0) { // Correct user key const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); switch (static_cast(tag & 0xff)) { case kTypeValue: { if (options.inplace_update_support) { GetLock(key.user_key())->ReadLock(); } Slice v = GetLengthPrefixedSlice(key_ptr + key_length); *s = Status::OK(); if (merge_in_progress) { assert(merge_operator); if (!merge_operator->FullMerge(key.user_key(), &v, *operands, value, logger.get())) { RecordTick(options.statistics, NUMBER_MERGE_FAILURES); *s = Status::Corruption("Error: Could not perform merge."); } } else { value->assign(v.data(), v.size()); } if (options.inplace_update_support) { GetLock(key.user_key())->Unlock(); } found_final_value = true; break; } case kTypeDeletion: { if (merge_in_progress) { assert(merge_operator); *s = Status::OK(); if (!merge_operator->FullMerge(key.user_key(), nullptr, *operands, value, logger.get())) { RecordTick(options.statistics, NUMBER_MERGE_FAILURES); *s = Status::Corruption("Error: Could not perform merge."); } } else { *s = Status::NotFound(Slice()); } found_final_value = true; break; } case kTypeMerge: { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); merge_in_progress = true; operands->push_front(v.ToString()); while(operands->size() >= 2) { // Attempt to associative merge. (Returns true if successful) if (merge_operator->PartialMerge(key.user_key(), Slice((*operands)[0]), Slice((*operands)[1]), &merge_result, logger.get())) { operands->pop_front(); swap(operands->front(), merge_result); } else { // Stack them because user can't associative merge break; } } break; } case kTypeLogData: assert(false); break; } } else { // exit loop if user key does not match break; } } // No change to value, since we have not yet found a Put/Delete if (!found_final_value && merge_in_progress) { *s = Status::MergeInProgress(""); } BumpPerfTime(&perf_context.get_from_memtable_time, &memtable_get_timer); BumpPerfCount(&perf_context.get_from_memtable_count); return found_final_value; } bool MemTable::Update(SequenceNumber seq, ValueType type, const Slice& key, const Slice& value) { LookupKey lkey(key, seq); Slice memkey = lkey.memtable_key(); std::shared_ptr iter( table_.get()->GetIterator(lkey.user_key())); iter->Seek(key, memkey.data()); if (iter->Valid()) { // entry format is: // klength varint32 // userkey char[klength-8] // tag uint64 // vlength varint32 // value char[vlength] // Check that it belongs to same user key. We do not check the // sequence number since the Seek() call above should have skipped // all entries with overly large sequence numbers. const char* entry = iter->key(); uint32_t key_length; const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); if (comparator_.comparator.user_comparator()->Compare( Slice(key_ptr, key_length - 8), lkey.user_key()) == 0) { // Correct user key const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); switch (static_cast(tag & 0xff)) { case kTypeValue: { uint32_t vlength; GetVarint32Ptr(key_ptr + key_length, key_ptr + key_length+5, &vlength); // Update value, if newValue size <= curValue size if (value.size() <= vlength) { char* p = EncodeVarint32(const_cast(key_ptr) + key_length, value.size()); WriteLock wl(GetLock(lkey.user_key())); memcpy(p, value.data(), value.size()); assert( (p + value.size()) - entry == (unsigned) (VarintLength(key_length) + key_length + VarintLength(value.size()) + value.size()) ); return true; } } default: // If the latest value is kTypeDeletion, kTypeMerge or kTypeLogData // then we probably don't have enough space to update in-place // Maybe do something later // Return false, and do normal Add() return false; } } } // Key doesn't exist return false; } } // namespace rocksdb