|
|
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
|
|
|
// This source code is licensed under the BSD-style license found in the
|
|
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
|
|
|
|
|
|
#include "rocksdb/utilities/write_batch_with_index.h"
|
|
|
|
|
|
|
|
#include <memory>
|
|
|
|
|
|
|
|
#include "rocksdb/comparator.h"
|
|
|
|
#include "rocksdb/iterator.h"
|
|
|
|
#include "db/column_family.h"
|
|
|
|
#include "db/skiplist.h"
|
|
|
|
#include "util/arena.h"
|
|
|
|
|
|
|
|
namespace rocksdb {
|
|
|
|
|
|
|
|
// when direction == forward
|
|
|
|
// * current_at_base_ <=> base_iterator > delta_iterator
|
|
|
|
// when direction == backwards
|
|
|
|
// * current_at_base_ <=> base_iterator < delta_iterator
|
|
|
|
// always:
|
|
|
|
// * equal_keys_ <=> base_iterator == delta_iterator
|
|
|
|
class BaseDeltaIterator : public Iterator {
|
|
|
|
public:
|
|
|
|
BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator,
|
|
|
|
const Comparator* comparator)
|
|
|
|
: forward_(true),
|
|
|
|
current_at_base_(true),
|
|
|
|
equal_keys_(false),
|
|
|
|
status_(Status::OK()),
|
|
|
|
base_iterator_(base_iterator),
|
|
|
|
delta_iterator_(delta_iterator),
|
|
|
|
comparator_(comparator) {}
|
|
|
|
|
|
|
|
virtual ~BaseDeltaIterator() {}
|
|
|
|
|
|
|
|
bool Valid() const override {
|
|
|
|
return current_at_base_ ? BaseValid() : DeltaValid();
|
|
|
|
}
|
|
|
|
|
|
|
|
void SeekToFirst() override {
|
|
|
|
forward_ = true;
|
|
|
|
base_iterator_->SeekToFirst();
|
|
|
|
delta_iterator_->SeekToFirst();
|
|
|
|
UpdateCurrent();
|
|
|
|
}
|
|
|
|
|
|
|
|
void SeekToLast() override {
|
|
|
|
forward_ = false;
|
|
|
|
base_iterator_->SeekToLast();
|
|
|
|
delta_iterator_->SeekToLast();
|
|
|
|
UpdateCurrent();
|
|
|
|
}
|
|
|
|
|
|
|
|
void Seek(const Slice& key) override {
|
|
|
|
forward_ = true;
|
|
|
|
base_iterator_->Seek(key);
|
|
|
|
delta_iterator_->Seek(key);
|
|
|
|
UpdateCurrent();
|
|
|
|
}
|
|
|
|
|
|
|
|
void Next() override {
|
|
|
|
if (!Valid()) {
|
|
|
|
status_ = Status::NotSupported("Next() on invalid iterator");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!forward_) {
|
|
|
|
// Need to change direction
|
|
|
|
// if our direction was backward and we're not equal, we have two states:
|
|
|
|
// * both iterators are valid: we're already in a good state (current
|
|
|
|
// shows to smaller)
|
|
|
|
// * only one iterator is valid: we need to advance that iterator
|
|
|
|
forward_ = true;
|
|
|
|
equal_keys_ = false;
|
|
|
|
if (!BaseValid()) {
|
|
|
|
assert(DeltaValid());
|
|
|
|
base_iterator_->SeekToFirst();
|
|
|
|
} else if (!DeltaValid()) {
|
|
|
|
delta_iterator_->SeekToFirst();
|
|
|
|
} else if (current_at_base_) {
|
|
|
|
// Change delta from larger than base to smaller
|
|
|
|
AdvanceDelta();
|
|
|
|
} else {
|
|
|
|
// Change base from larger than delta to smaller
|
|
|
|
AdvanceBase();
|
|
|
|
}
|
|
|
|
if (DeltaValid() && BaseValid()) {
|
|
|
|
if (Compare() == 0) {
|
|
|
|
equal_keys_ = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Advance();
|
|
|
|
}
|
|
|
|
|
|
|
|
void Prev() override {
|
|
|
|
if (!Valid()) {
|
|
|
|
status_ = Status::NotSupported("Prev() on invalid iterator");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (forward_) {
|
|
|
|
// Need to change direction
|
|
|
|
// if our direction was backward and we're not equal, we have two states:
|
|
|
|
// * both iterators are valid: we're already in a good state (current
|
|
|
|
// shows to smaller)
|
|
|
|
// * only one iterator is valid: we need to advance that iterator
|
|
|
|
forward_ = false;
|
|
|
|
equal_keys_ = false;
|
|
|
|
if (!BaseValid()) {
|
|
|
|
assert(DeltaValid());
|
|
|
|
base_iterator_->SeekToLast();
|
|
|
|
} else if (!DeltaValid()) {
|
|
|
|
delta_iterator_->SeekToLast();
|
|
|
|
} else if (current_at_base_) {
|
|
|
|
// Change delta from less advanced than base to more advanced
|
|
|
|
AdvanceDelta();
|
|
|
|
} else {
|
|
|
|
// Change base from less advanced than delta to more advanced
|
|
|
|
AdvanceBase();
|
|
|
|
}
|
|
|
|
if (DeltaValid() && BaseValid()) {
|
|
|
|
if (Compare() == 0) {
|
|
|
|
equal_keys_ = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Advance();
|
|
|
|
}
|
|
|
|
|
|
|
|
Slice key() const override {
|
|
|
|
return current_at_base_ ? base_iterator_->key()
|
|
|
|
: delta_iterator_->Entry().key;
|
|
|
|
}
|
|
|
|
|
|
|
|
Slice value() const override {
|
|
|
|
return current_at_base_ ? base_iterator_->value()
|
|
|
|
: delta_iterator_->Entry().value;
|
|
|
|
}
|
|
|
|
|
|
|
|
Status status() const {
|
|
|
|
if (!status_.ok()) {
|
|
|
|
return status_;
|
|
|
|
}
|
|
|
|
if (!base_iterator_->status().ok()) {
|
|
|
|
return base_iterator_->status();
|
|
|
|
}
|
|
|
|
return delta_iterator_->status();
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
// -1 -- delta less advanced than base
|
|
|
|
// 0 -- delta == base
|
|
|
|
// 1 -- delta more advanced than base
|
|
|
|
int Compare() const {
|
|
|
|
assert(delta_iterator_->Valid() && base_iterator_->Valid());
|
|
|
|
int cmp = comparator_->Compare(delta_iterator_->Entry().key,
|
|
|
|
base_iterator_->key());
|
|
|
|
if (forward_) {
|
|
|
|
return cmp;
|
|
|
|
} else {
|
|
|
|
return -cmp;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
bool IsDeltaDelete() {
|
|
|
|
assert(DeltaValid());
|
|
|
|
return delta_iterator_->Entry().type == kDeleteRecord;
|
|
|
|
}
|
|
|
|
void AssertInvariants() {
|
|
|
|
#ifndef NDEBUG
|
|
|
|
if (!Valid()) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
if (!BaseValid()) {
|
|
|
|
assert(!current_at_base_ && delta_iterator_->Valid());
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
if (!DeltaValid()) {
|
|
|
|
assert(current_at_base_ && base_iterator_->Valid());
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
// we don't support those yet
|
|
|
|
assert(delta_iterator_->Entry().type != kMergeRecord &&
|
|
|
|
delta_iterator_->Entry().type != kLogDataRecord);
|
|
|
|
int compare = comparator_->Compare(delta_iterator_->Entry().key,
|
|
|
|
base_iterator_->key());
|
|
|
|
if (forward_) {
|
|
|
|
// current_at_base -> compare < 0
|
|
|
|
assert(!current_at_base_ || compare < 0);
|
|
|
|
// !current_at_base -> compare <= 0
|
|
|
|
assert(current_at_base_ && compare >= 0);
|
|
|
|
} else {
|
|
|
|
// current_at_base -> compare > 0
|
|
|
|
assert(!current_at_base_ || compare > 0);
|
|
|
|
// !current_at_base -> compare <= 0
|
|
|
|
assert(current_at_base_ && compare <= 0);
|
|
|
|
}
|
|
|
|
// equal_keys_ <=> compare == 0
|
|
|
|
assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0));
|
|
|
|
#endif
|
|
|
|
}
|
|
|
|
|
|
|
|
void Advance() {
|
|
|
|
if (equal_keys_) {
|
|
|
|
assert(BaseValid() && DeltaValid());
|
|
|
|
AdvanceBase();
|
|
|
|
AdvanceDelta();
|
|
|
|
} else {
|
|
|
|
if (current_at_base_) {
|
|
|
|
assert(BaseValid());
|
|
|
|
AdvanceBase();
|
|
|
|
} else {
|
|
|
|
assert(DeltaValid());
|
|
|
|
AdvanceDelta();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
UpdateCurrent();
|
|
|
|
}
|
|
|
|
|
|
|
|
void AdvanceDelta() {
|
|
|
|
if (forward_) {
|
|
|
|
delta_iterator_->Next();
|
|
|
|
} else {
|
|
|
|
delta_iterator_->Prev();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
void AdvanceBase() {
|
|
|
|
if (forward_) {
|
|
|
|
base_iterator_->Next();
|
|
|
|
} else {
|
|
|
|
base_iterator_->Prev();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
bool BaseValid() const { return base_iterator_->Valid(); }
|
|
|
|
bool DeltaValid() const { return delta_iterator_->Valid(); }
|
|
|
|
void UpdateCurrent() {
|
|
|
|
while (true) {
|
|
|
|
equal_keys_ = false;
|
|
|
|
if (!BaseValid()) {
|
|
|
|
// Base has finished.
|
|
|
|
if (!DeltaValid()) {
|
|
|
|
// Finished
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
if (IsDeltaDelete()) {
|
|
|
|
AdvanceDelta();
|
|
|
|
} else {
|
|
|
|
current_at_base_ = false;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
} else if (!DeltaValid()) {
|
|
|
|
// Delta has finished.
|
|
|
|
current_at_base_ = true;
|
|
|
|
return;
|
|
|
|
} else {
|
|
|
|
int compare = Compare();
|
|
|
|
if (compare <= 0) { // delta bigger or equal
|
|
|
|
if (compare == 0) {
|
|
|
|
equal_keys_ = true;
|
|
|
|
}
|
|
|
|
if (!IsDeltaDelete()) {
|
|
|
|
current_at_base_ = false;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
// Delta is less advanced and is delete.
|
|
|
|
AdvanceDelta();
|
|
|
|
if (equal_keys_) {
|
|
|
|
AdvanceBase();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
current_at_base_ = true;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
AssertInvariants();
|
|
|
|
}
|
|
|
|
|
|
|
|
bool forward_;
|
|
|
|
bool current_at_base_;
|
|
|
|
bool equal_keys_;
|
|
|
|
Status status_;
|
|
|
|
std::unique_ptr<Iterator> base_iterator_;
|
|
|
|
std::unique_ptr<WBWIIterator> delta_iterator_;
|
|
|
|
const Comparator* comparator_; // not owned
|
|
|
|
};
|
|
|
|
|
|
|
|
class ReadableWriteBatch : public WriteBatch {
|
|
|
|
public:
|
|
|
|
explicit ReadableWriteBatch(size_t reserved_bytes = 0)
|
|
|
|
: WriteBatch(reserved_bytes) {}
|
|
|
|
// Retrieve some information from a write entry in the write batch, given
|
|
|
|
// the start offset of the write entry.
|
|
|
|
Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key,
|
|
|
|
Slice* value, Slice* blob) const;
|
|
|
|
};
|
|
|
|
|
|
|
|
// Key used by skip list, as the binary searchable index of WriteBatchWithIndex.
|
|
|
|
struct WriteBatchIndexEntry {
|
|
|
|
WriteBatchIndexEntry(size_t o, uint32_t c)
|
|
|
|
: offset(o), column_family(c), search_key(nullptr) {}
|
|
|
|
WriteBatchIndexEntry(const Slice* sk, uint32_t c)
|
|
|
|
: offset(0), column_family(c), search_key(sk) {}
|
|
|
|
|
|
|
|
// If this flag appears in the offset, it indicates a key that is smaller
|
|
|
|
// than any other entry for the same column family
|
|
|
|
static const size_t kFlagMin = std::numeric_limits<size_t>::max();
|
|
|
|
|
|
|
|
size_t offset; // offset of an entry in write batch's string buffer.
|
|
|
|
uint32_t column_family; // column family of the entry
|
|
|
|
const Slice* search_key; // if not null, instead of reading keys from
|
|
|
|
// write batch, use it to compare. This is used
|
|
|
|
// for lookup key.
|
|
|
|
};
|
|
|
|
|
|
|
|
class WriteBatchEntryComparator {
|
|
|
|
public:
|
|
|
|
WriteBatchEntryComparator(const Comparator* default_comparator,
|
|
|
|
const ReadableWriteBatch* write_batch)
|
|
|
|
: default_comparator_(default_comparator), write_batch_(write_batch) {}
|
|
|
|
// Compare a and b. Return a negative value if a is less than b, 0 if they
|
|
|
|
// are equal, and a positive value if a is greater than b
|
|
|
|
int operator()(const WriteBatchIndexEntry* entry1,
|
|
|
|
const WriteBatchIndexEntry* entry2) const;
|
|
|
|
|
|
|
|
int CompareKey(uint32_t column_family, const Slice& key1,
|
|
|
|
const Slice& key2) const;
|
|
|
|
|
|
|
|
void SetComparatorForCF(uint32_t column_family_id,
|
|
|
|
const Comparator* comparator) {
|
|
|
|
cf_comparator_map_[column_family_id] = comparator;
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
const Comparator* default_comparator_;
|
|
|
|
std::unordered_map<uint32_t, const Comparator*> cf_comparator_map_;
|
|
|
|
const ReadableWriteBatch* write_batch_;
|
|
|
|
};
|
|
|
|
|
|
|
|
typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>
|
|
|
|
WriteBatchEntrySkipList;
|
|
|
|
|
|
|
|
class WBWIIteratorImpl : public WBWIIterator {
|
|
|
|
public:
|
|
|
|
WBWIIteratorImpl(uint32_t column_family_id,
|
|
|
|
WriteBatchEntrySkipList* skip_list,
|
|
|
|
const ReadableWriteBatch* write_batch)
|
|
|
|
: column_family_id_(column_family_id),
|
|
|
|
skip_list_iter_(skip_list),
|
|
|
|
write_batch_(write_batch),
|
|
|
|
valid_(false) {}
|
|
|
|
|
|
|
|
virtual ~WBWIIteratorImpl() {}
|
|
|
|
|
|
|
|
virtual bool Valid() const override { return valid_; }
|
|
|
|
|
|
|
|
virtual void SeekToFirst() {
|
|
|
|
valid_ = true;
|
|
|
|
WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
|
|
|
|
column_family_id_);
|
|
|
|
skip_list_iter_.Seek(&search_entry);
|
|
|
|
ReadEntry();
|
|
|
|
}
|
|
|
|
|
|
|
|
virtual void SeekToLast() {
|
|
|
|
valid_ = true;
|
|
|
|
WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
|
|
|
|
column_family_id_ + 1);
|
|
|
|
skip_list_iter_.Seek(&search_entry);
|
|
|
|
if (!skip_list_iter_.Valid()) {
|
|
|
|
skip_list_iter_.SeekToLast();
|
|
|
|
} else {
|
|
|
|
skip_list_iter_.Prev();
|
|
|
|
}
|
|
|
|
ReadEntry();
|
|
|
|
}
|
|
|
|
|
|
|
|
virtual void Seek(const Slice& key) override {
|
|
|
|
valid_ = true;
|
|
|
|
WriteBatchIndexEntry search_entry(&key, column_family_id_);
|
|
|
|
skip_list_iter_.Seek(&search_entry);
|
|
|
|
ReadEntry();
|
|
|
|
}
|
|
|
|
|
|
|
|
virtual void Next() override {
|
|
|
|
skip_list_iter_.Next();
|
|
|
|
ReadEntry();
|
|
|
|
}
|
|
|
|
|
|
|
|
virtual void Prev() override {
|
|
|
|
skip_list_iter_.Prev();
|
|
|
|
ReadEntry();
|
|
|
|
}
|
|
|
|
|
|
|
|
virtual const WriteEntry& Entry() const override { return current_; }
|
|
|
|
|
|
|
|
virtual Status status() const override { return status_; }
|
|
|
|
|
|
|
|
const WriteBatchIndexEntry* GetRawEntry() const {
|
|
|
|
return skip_list_iter_.key();
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
uint32_t column_family_id_;
|
|
|
|
WriteBatchEntrySkipList::Iterator skip_list_iter_;
|
|
|
|
const ReadableWriteBatch* write_batch_;
|
|
|
|
Status status_;
|
|
|
|
bool valid_;
|
|
|
|
WriteEntry current_;
|
|
|
|
|
|
|
|
void ReadEntry() {
|
|
|
|
if (!status_.ok() || !skip_list_iter_.Valid()) {
|
|
|
|
valid_ = false;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
|
|
|
|
if (iter_entry == nullptr ||
|
|
|
|
iter_entry->column_family != column_family_id_) {
|
|
|
|
valid_ = false;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
Slice blob;
|
|
|
|
status_ = write_batch_->GetEntryFromDataOffset(
|
|
|
|
iter_entry->offset, ¤t_.type, ¤t_.key, ¤t_.value,
|
|
|
|
&blob);
|
|
|
|
if (!status_.ok()) {
|
|
|
|
valid_ = false;
|
|
|
|
} else if (current_.type != kPutRecord && current_.type != kDeleteRecord &&
|
|
|
|
current_.type != kMergeRecord) {
|
|
|
|
valid_ = false;
|
|
|
|
status_ = Status::Corruption("write batch index is corrupted");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
struct WriteBatchWithIndex::Rep {
|
|
|
|
Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
|
|
|
|
bool _overwrite_key = false)
|
|
|
|
: write_batch(reserved_bytes),
|
|
|
|
comparator(index_comparator, &write_batch),
|
|
|
|
skip_list(comparator, &arena),
|
|
|
|
overwrite_key(_overwrite_key),
|
|
|
|
last_entry_offset(0) {}
|
|
|
|
ReadableWriteBatch write_batch;
|
|
|
|
WriteBatchEntryComparator comparator;
|
|
|
|
Arena arena;
|
|
|
|
WriteBatchEntrySkipList skip_list;
|
|
|
|
bool overwrite_key;
|
|
|
|
size_t last_entry_offset;
|
|
|
|
|
|
|
|
// Remember current offset of internal write batch, which is used as
|
|
|
|
// the starting offset of the next record.
|
|
|
|
void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); }
|
|
|
|
|
|
|
|
// In overwrite mode, find the existing entry for the same key and update it
|
|
|
|
// to point to the current entry.
|
|
|
|
// Return true if the key is found and updated.
|
|
|
|
bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key);
|
|
|
|
bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key);
|
|
|
|
|
|
|
|
// Add the recent entry to the update.
|
|
|
|
// In overwrite mode, if key already exists in the index, update it.
|
|
|
|
void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key);
|
|
|
|
void AddOrUpdateIndex(const Slice& key);
|
|
|
|
|
|
|
|
// Allocate an index entry pointing to the last entry in the write batch and
|
|
|
|
// put it to skip list.
|
|
|
|
void AddNewEntry(uint32_t column_family_id);
|
|
|
|
};
|
|
|
|
|
|
|
|
bool WriteBatchWithIndex::Rep::UpdateExistingEntry(
|
|
|
|
ColumnFamilyHandle* column_family, const Slice& key) {
|
|
|
|
uint32_t cf_id = GetColumnFamilyID(column_family);
|
|
|
|
return UpdateExistingEntryWithCfId(cf_id, key);
|
|
|
|
}
|
|
|
|
|
|
|
|
bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
|
|
|
|
uint32_t column_family_id, const Slice& key) {
|
|
|
|
if (!overwrite_key) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch);
|
|
|
|
iter.Seek(key);
|
|
|
|
if (!iter.Valid()) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
if (comparator.CompareKey(column_family_id, key, iter.Entry().key) != 0) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
WriteBatchIndexEntry* non_const_entry =
|
|
|
|
const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry());
|
|
|
|
non_const_entry->offset = last_entry_offset;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
void WriteBatchWithIndex::Rep::AddOrUpdateIndex(
|
|
|
|
ColumnFamilyHandle* column_family, const Slice& key) {
|
|
|
|
if (!UpdateExistingEntry(column_family, key)) {
|
|
|
|
uint32_t cf_id = GetColumnFamilyID(column_family);
|
|
|
|
const auto* cf_cmp = GetColumnFamilyUserComparator(column_family);
|
|
|
|
if (cf_cmp != nullptr) {
|
|
|
|
comparator.SetComparatorForCF(cf_id, cf_cmp);
|
|
|
|
}
|
|
|
|
AddNewEntry(cf_id);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) {
|
|
|
|
if (!UpdateExistingEntryWithCfId(0, key)) {
|
|
|
|
AddNewEntry(0);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
|
|
|
|
auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
|
|
|
|
auto* index_entry =
|
|
|
|
new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id);
|
|
|
|
skip_list.Insert(index_entry);
|
|
|
|
}
|
|
|
|
|
|
|
|
Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
|
|
|
|
WriteType* type, Slice* Key,
|
|
|
|
Slice* value,
|
|
|
|
Slice* blob) const {
|
|
|
|
if (type == nullptr || Key == nullptr || value == nullptr ||
|
|
|
|
blob == nullptr) {
|
|
|
|
return Status::InvalidArgument("Output parameters cannot be null");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (data_offset >= GetDataSize()) {
|
|
|
|
return Status::InvalidArgument("data offset exceed write batch size");
|
|
|
|
}
|
|
|
|
Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset);
|
|
|
|
char tag;
|
|
|
|
uint32_t column_family;
|
|
|
|
Status s =
|
|
|
|
ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, blob);
|
|
|
|
|
|
|
|
switch (tag) {
|
|
|
|
case kTypeColumnFamilyValue:
|
|
|
|
case kTypeValue:
|
|
|
|
*type = kPutRecord;
|
|
|
|
break;
|
|
|
|
case kTypeColumnFamilyDeletion:
|
|
|
|
case kTypeDeletion:
|
|
|
|
*type = kDeleteRecord;
|
|
|
|
break;
|
|
|
|
case kTypeColumnFamilyMerge:
|
|
|
|
case kTypeMerge:
|
|
|
|
*type = kMergeRecord;
|
|
|
|
break;
|
|
|
|
case kTypeLogData:
|
|
|
|
*type = kLogDataRecord;
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
return Status::Corruption("unknown WriteBatch tag");
|
|
|
|
}
|
|
|
|
return Status::OK();
|
|
|
|
}
|
|
|
|
|
|
|
|
WriteBatchWithIndex::WriteBatchWithIndex(
|
|
|
|
const Comparator* default_index_comparator, size_t reserved_bytes,
|
|
|
|
bool overwrite_key)
|
|
|
|
: rep(new Rep(default_index_comparator, reserved_bytes, overwrite_key)) {}
|
|
|
|
|
|
|
|
WriteBatchWithIndex::~WriteBatchWithIndex() { delete rep; }
|
|
|
|
|
|
|
|
WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
|
|
|
|
|
|
|
|
WBWIIterator* WriteBatchWithIndex::NewIterator() {
|
|
|
|
return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch);
|
|
|
|
}
|
|
|
|
|
|
|
|
WBWIIterator* WriteBatchWithIndex::NewIterator(
|
|
|
|
ColumnFamilyHandle* column_family) {
|
|
|
|
return new WBWIIteratorImpl(GetColumnFamilyID(column_family),
|
|
|
|
&(rep->skip_list), &rep->write_batch);
|
|
|
|
}
|
|
|
|
|
|
|
|
Iterator* WriteBatchWithIndex::NewIteratorWithBase(
|
|
|
|
ColumnFamilyHandle* column_family, Iterator* base_iterator) {
|
|
|
|
if (rep->overwrite_key == false) {
|
|
|
|
assert(false);
|
|
|
|
return nullptr;
|
|
|
|
}
|
|
|
|
return new BaseDeltaIterator(base_iterator, NewIterator(column_family),
|
|
|
|
GetColumnFamilyUserComparator(column_family));
|
|
|
|
}
|
|
|
|
|
|
|
|
void WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
|
|
|
|
const Slice& key, const Slice& value) {
|
|
|
|
rep->SetLastEntryOffset();
|
|
|
|
rep->write_batch.Put(column_family, key, value);
|
|
|
|
rep->AddOrUpdateIndex(column_family, key);
|
|
|
|
}
|
|
|
|
|
|
|
|
void WriteBatchWithIndex::Put(const Slice& key, const Slice& value) {
|
|
|
|
rep->SetLastEntryOffset();
|
|
|
|
rep->write_batch.Put(key, value);
|
|
|
|
rep->AddOrUpdateIndex(key);
|
|
|
|
}
|
|
|
|
|
|
|
|
void WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
|
|
|
|
const Slice& key, const Slice& value) {
|
|
|
|
rep->SetLastEntryOffset();
|
|
|
|
rep->write_batch.Merge(column_family, key, value);
|
|
|
|
rep->AddOrUpdateIndex(column_family, key);
|
|
|
|
}
|
|
|
|
|
|
|
|
void WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
|
|
|
|
rep->SetLastEntryOffset();
|
|
|
|
rep->write_batch.Merge(key, value);
|
|
|
|
rep->AddOrUpdateIndex(key);
|
|
|
|
}
|
|
|
|
|
|
|
|
void WriteBatchWithIndex::PutLogData(const Slice& blob) {
|
|
|
|
rep->write_batch.PutLogData(blob);
|
|
|
|
}
|
|
|
|
|
|
|
|
void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
|
|
|
|
const Slice& key) {
|
|
|
|
rep->SetLastEntryOffset();
|
|
|
|
rep->write_batch.Delete(column_family, key);
|
|
|
|
rep->AddOrUpdateIndex(column_family, key);
|
|
|
|
}
|
|
|
|
|
|
|
|
void WriteBatchWithIndex::Delete(const Slice& key) {
|
|
|
|
rep->SetLastEntryOffset();
|
|
|
|
rep->write_batch.Delete(key);
|
|
|
|
rep->AddOrUpdateIndex(key);
|
|
|
|
}
|
|
|
|
|
|
|
|
int WriteBatchEntryComparator::operator()(
|
|
|
|
const WriteBatchIndexEntry* entry1,
|
|
|
|
const WriteBatchIndexEntry* entry2) const {
|
|
|
|
if (entry1->column_family > entry2->column_family) {
|
|
|
|
return 1;
|
|
|
|
} else if (entry1->column_family < entry2->column_family) {
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (entry1->offset == WriteBatchIndexEntry::kFlagMin) {
|
|
|
|
return -1;
|
|
|
|
} else if (entry2->offset == WriteBatchIndexEntry::kFlagMin) {
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
|
|
|
|
Status s;
|
|
|
|
Slice key1, key2;
|
|
|
|
if (entry1->search_key == nullptr) {
|
|
|
|
Slice value, blob;
|
|
|
|
WriteType write_type;
|
|
|
|
s = write_batch_->GetEntryFromDataOffset(entry1->offset, &write_type, &key1,
|
|
|
|
&value, &blob);
|
|
|
|
if (!s.ok()) {
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
key1 = *(entry1->search_key);
|
|
|
|
}
|
|
|
|
if (entry2->search_key == nullptr) {
|
|
|
|
Slice value, blob;
|
|
|
|
WriteType write_type;
|
|
|
|
s = write_batch_->GetEntryFromDataOffset(entry2->offset, &write_type, &key2,
|
|
|
|
&value, &blob);
|
|
|
|
if (!s.ok()) {
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
key2 = *(entry2->search_key);
|
|
|
|
}
|
|
|
|
|
|
|
|
int cmp = CompareKey(entry1->column_family, key1, key2);
|
|
|
|
if (cmp != 0) {
|
|
|
|
return cmp;
|
|
|
|
} else if (entry1->offset > entry2->offset) {
|
|
|
|
return 1;
|
|
|
|
} else if (entry1->offset < entry2->offset) {
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
int WriteBatchEntryComparator::CompareKey(uint32_t column_family,
|
|
|
|
const Slice& key1,
|
|
|
|
const Slice& key2) const {
|
|
|
|
auto comparator_for_cf = cf_comparator_map_.find(column_family);
|
|
|
|
if (comparator_for_cf != cf_comparator_map_.end()) {
|
|
|
|
return comparator_for_cf->second->Compare(key1, key2);
|
|
|
|
} else {
|
|
|
|
return default_comparator_->Compare(key1, key2);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
} // namespace rocksdb
|