fork of https://github.com/oxigraph/rocksdb and https://github.com/facebook/rocksdb for nextgraph and oxigraph
				
			
			
		
			You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							698 lines
						
					
					
						
							21 KiB
						
					
					
				
			
		
		
	
	
							698 lines
						
					
					
						
							21 KiB
						
					
					
				| //  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
 | |
| //  This source code is licensed under the BSD-style license found in the
 | |
| //  LICENSE file in the root directory of this source tree. An additional grant
 | |
| //  of patent rights can be found in the PATENTS file in the same directory.
 | |
| 
 | |
| #include "rocksdb/utilities/write_batch_with_index.h"
 | |
| 
 | |
| #include <memory>
 | |
| 
 | |
| #include "rocksdb/comparator.h"
 | |
| #include "rocksdb/iterator.h"
 | |
| #include "db/column_family.h"
 | |
| #include "db/skiplist.h"
 | |
| #include "util/arena.h"
 | |
| 
 | |
| namespace rocksdb {
 | |
| 
 | |
| // when direction == forward
 | |
| // * current_at_base_ <=> base_iterator > delta_iterator
 | |
| // when direction == backwards
 | |
| // * current_at_base_ <=> base_iterator < delta_iterator
 | |
| // always:
 | |
| // * equal_keys_ <=> base_iterator == delta_iterator
 | |
| class BaseDeltaIterator : public Iterator {
 | |
|  public:
 | |
|   BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator,
 | |
|                     const Comparator* comparator)
 | |
|       : forward_(true),
 | |
|         current_at_base_(true),
 | |
|         equal_keys_(false),
 | |
|         status_(Status::OK()),
 | |
|         base_iterator_(base_iterator),
 | |
|         delta_iterator_(delta_iterator),
 | |
|         comparator_(comparator) {}
 | |
| 
 | |
|   virtual ~BaseDeltaIterator() {}
 | |
| 
 | |
|   bool Valid() const override {
 | |
|     return current_at_base_ ? BaseValid() : DeltaValid();
 | |
|   }
 | |
| 
 | |
|   void SeekToFirst() override {
 | |
|     forward_ = true;
 | |
|     base_iterator_->SeekToFirst();
 | |
|     delta_iterator_->SeekToFirst();
 | |
|     UpdateCurrent();
 | |
|   }
 | |
| 
 | |
|   void SeekToLast() override {
 | |
|     forward_ = false;
 | |
|     base_iterator_->SeekToLast();
 | |
|     delta_iterator_->SeekToLast();
 | |
|     UpdateCurrent();
 | |
|   }
 | |
| 
 | |
|   void Seek(const Slice& key) override {
 | |
|     forward_ = true;
 | |
|     base_iterator_->Seek(key);
 | |
|     delta_iterator_->Seek(key);
 | |
|     UpdateCurrent();
 | |
|   }
 | |
| 
 | |
|   void Next() override {
 | |
|     if (!Valid()) {
 | |
|       status_ = Status::NotSupported("Next() on invalid iterator");
 | |
|     }
 | |
| 
 | |
|     if (!forward_) {
 | |
|       // Need to change direction
 | |
|       // if our direction was backward and we're not equal, we have two states:
 | |
|       // * both iterators are valid: we're already in a good state (current
 | |
|       // shows to smaller)
 | |
|       // * only one iterator is valid: we need to advance that iterator
 | |
|       forward_ = true;
 | |
|       equal_keys_ = false;
 | |
|       if (!BaseValid()) {
 | |
|         assert(DeltaValid());
 | |
|         base_iterator_->SeekToFirst();
 | |
|       } else if (!DeltaValid()) {
 | |
|         delta_iterator_->SeekToFirst();
 | |
|       } else if (current_at_base_) {
 | |
|         // Change delta from larger than base to smaller
 | |
|         AdvanceDelta();
 | |
|       } else {
 | |
|         // Change base from larger than delta to smaller
 | |
|         AdvanceBase();
 | |
|       }
 | |
|       if (DeltaValid() && BaseValid()) {
 | |
|         if (Compare() == 0) {
 | |
|           equal_keys_ = true;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|     Advance();
 | |
|   }
 | |
| 
 | |
|   void Prev() override {
 | |
|     if (!Valid()) {
 | |
|       status_ = Status::NotSupported("Prev() on invalid iterator");
 | |
|     }
 | |
| 
 | |
|     if (forward_) {
 | |
|       // Need to change direction
 | |
|       // if our direction was backward and we're not equal, we have two states:
 | |
|       // * both iterators are valid: we're already in a good state (current
 | |
|       // shows to smaller)
 | |
|       // * only one iterator is valid: we need to advance that iterator
 | |
|       forward_ = false;
 | |
|       equal_keys_ = false;
 | |
|       if (!BaseValid()) {
 | |
|         assert(DeltaValid());
 | |
|         base_iterator_->SeekToLast();
 | |
|       } else if (!DeltaValid()) {
 | |
|         delta_iterator_->SeekToLast();
 | |
|       } else if (current_at_base_) {
 | |
|         // Change delta from less advanced than base to more advanced
 | |
|         AdvanceDelta();
 | |
|       } else {
 | |
|         // Change base from less advanced than delta to more advanced
 | |
|         AdvanceBase();
 | |
|       }
 | |
|       if (DeltaValid() && BaseValid()) {
 | |
|         if (Compare() == 0) {
 | |
|           equal_keys_ = true;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     Advance();
 | |
|   }
 | |
| 
 | |
|   Slice key() const override {
 | |
|     return current_at_base_ ? base_iterator_->key()
 | |
|                             : delta_iterator_->Entry().key;
 | |
|   }
 | |
| 
 | |
|   Slice value() const override {
 | |
|     return current_at_base_ ? base_iterator_->value()
 | |
|                             : delta_iterator_->Entry().value;
 | |
|   }
 | |
| 
 | |
|   Status status() const {
 | |
|     if (!status_.ok()) {
 | |
|       return status_;
 | |
|     }
 | |
|     if (!base_iterator_->status().ok()) {
 | |
|       return base_iterator_->status();
 | |
|     }
 | |
|     return delta_iterator_->status();
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   // -1 -- delta less advanced than base
 | |
|   // 0 -- delta == base
 | |
|   // 1 -- delta more advanced than base
 | |
|   int Compare() const {
 | |
|     assert(delta_iterator_->Valid() && base_iterator_->Valid());
 | |
|     int cmp = comparator_->Compare(delta_iterator_->Entry().key,
 | |
|                                    base_iterator_->key());
 | |
|     if (forward_) {
 | |
|       return cmp;
 | |
|     } else {
 | |
|       return -cmp;
 | |
|     }
 | |
|   }
 | |
|   bool IsDeltaDelete() {
 | |
|     assert(DeltaValid());
 | |
|     return delta_iterator_->Entry().type == kDeleteRecord;
 | |
|   }
 | |
|   void AssertInvariants() {
 | |
| #ifndef NDEBUG
 | |
|     if (!Valid()) {
 | |
|       return;
 | |
|     }
 | |
|     if (!BaseValid()) {
 | |
|       assert(!current_at_base_ && delta_iterator_->Valid());
 | |
|       return;
 | |
|     }
 | |
|     if (!DeltaValid()) {
 | |
|       assert(current_at_base_ && base_iterator_->Valid());
 | |
|       return;
 | |
|     }
 | |
|     // we don't support those yet
 | |
|     assert(delta_iterator_->Entry().type != kMergeRecord &&
 | |
|            delta_iterator_->Entry().type != kLogDataRecord);
 | |
|     int compare = comparator_->Compare(delta_iterator_->Entry().key,
 | |
|                                        base_iterator_->key());
 | |
|     if (forward_) {
 | |
|       // current_at_base -> compare < 0
 | |
|       assert(!current_at_base_ || compare < 0);
 | |
|       // !current_at_base -> compare <= 0
 | |
|       assert(current_at_base_ && compare >= 0);
 | |
|     } else {
 | |
|       // current_at_base -> compare > 0
 | |
|       assert(!current_at_base_ || compare > 0);
 | |
|       // !current_at_base -> compare <= 0
 | |
|       assert(current_at_base_ && compare <= 0);
 | |
|     }
 | |
|     // equal_keys_ <=> compare == 0
 | |
|     assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0));
 | |
| #endif
 | |
|   }
 | |
| 
 | |
|   void Advance() {
 | |
|     if (equal_keys_) {
 | |
|       assert(BaseValid() && DeltaValid());
 | |
|       AdvanceBase();
 | |
|       AdvanceDelta();
 | |
|     } else {
 | |
|       if (current_at_base_) {
 | |
|         assert(BaseValid());
 | |
|         AdvanceBase();
 | |
|       } else {
 | |
|         assert(DeltaValid());
 | |
|         AdvanceDelta();
 | |
|       }
 | |
|     }
 | |
|     UpdateCurrent();
 | |
|   }
 | |
| 
 | |
|   void AdvanceDelta() {
 | |
|     if (forward_) {
 | |
|       delta_iterator_->Next();
 | |
|     } else {
 | |
|       delta_iterator_->Prev();
 | |
|     }
 | |
|   }
 | |
|   void AdvanceBase() {
 | |
|     if (forward_) {
 | |
|       base_iterator_->Next();
 | |
|     } else {
 | |
|       base_iterator_->Prev();
 | |
|     }
 | |
|   }
 | |
|   bool BaseValid() const { return base_iterator_->Valid(); }
 | |
|   bool DeltaValid() const { return delta_iterator_->Valid(); }
 | |
|   void UpdateCurrent() {
 | |
|     while (true) {
 | |
|       equal_keys_ = false;
 | |
|       if (!BaseValid()) {
 | |
|         // Base has finished.
 | |
|         if (!DeltaValid()) {
 | |
|           // Finished
 | |
|           return;
 | |
|         }
 | |
|         if (IsDeltaDelete()) {
 | |
|           AdvanceDelta();
 | |
|         } else {
 | |
|           current_at_base_ = false;
 | |
|           return;
 | |
|         }
 | |
|       } else if (!DeltaValid()) {
 | |
|         // Delta has finished.
 | |
|         current_at_base_ = true;
 | |
|         return;
 | |
|       } else {
 | |
|         int compare = Compare();
 | |
|         if (compare <= 0) {  // delta bigger or equal
 | |
|           if (compare == 0) {
 | |
|             equal_keys_ = true;
 | |
|           }
 | |
|           if (!IsDeltaDelete()) {
 | |
|             current_at_base_ = false;
 | |
|             return;
 | |
|           }
 | |
|           // Delta is less advanced and is delete.
 | |
|           AdvanceDelta();
 | |
|           if (equal_keys_) {
 | |
|             AdvanceBase();
 | |
|           }
 | |
|         } else {
 | |
|           current_at_base_ = true;
 | |
|           return;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     AssertInvariants();
 | |
|   }
 | |
| 
 | |
|   bool forward_;
 | |
|   bool current_at_base_;
 | |
|   bool equal_keys_;
 | |
|   Status status_;
 | |
|   std::unique_ptr<Iterator> base_iterator_;
 | |
|   std::unique_ptr<WBWIIterator> delta_iterator_;
 | |
|   const Comparator* comparator_;  // not owned
 | |
| };
 | |
| 
 | |
| class ReadableWriteBatch : public WriteBatch {
 | |
|  public:
 | |
|   explicit ReadableWriteBatch(size_t reserved_bytes = 0)
 | |
|       : WriteBatch(reserved_bytes) {}
 | |
|   // Retrieve some information from a write entry in the write batch, given
 | |
|   // the start offset of the write entry.
 | |
|   Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key,
 | |
|                                 Slice* value, Slice* blob) const;
 | |
| };
 | |
| 
 | |
| // Key used by skip list, as the binary searchable index of WriteBatchWithIndex.
 | |
| struct WriteBatchIndexEntry {
 | |
|   WriteBatchIndexEntry(size_t o, uint32_t c)
 | |
|       : offset(o), column_family(c), search_key(nullptr) {}
 | |
|   WriteBatchIndexEntry(const Slice* sk, uint32_t c)
 | |
|       : offset(0), column_family(c), search_key(sk) {}
 | |
| 
 | |
|   // If this flag appears in the offset, it indicates a key that is smaller
 | |
|   // than any other entry for the same column family
 | |
|   static const size_t kFlagMin = std::numeric_limits<size_t>::max();
 | |
| 
 | |
|   size_t offset;           // offset of an entry in write batch's string buffer.
 | |
|   uint32_t column_family;  // column family of the entry
 | |
|   const Slice* search_key;  // if not null, instead of reading keys from
 | |
|                             // write batch, use it to compare. This is used
 | |
|                             // for lookup key.
 | |
| };
 | |
| 
 | |
| class WriteBatchEntryComparator {
 | |
|  public:
 | |
|   WriteBatchEntryComparator(const Comparator* default_comparator,
 | |
|                             const ReadableWriteBatch* write_batch)
 | |
|       : default_comparator_(default_comparator), write_batch_(write_batch) {}
 | |
|   // Compare a and b. Return a negative value if a is less than b, 0 if they
 | |
|   // are equal, and a positive value if a is greater than b
 | |
|   int operator()(const WriteBatchIndexEntry* entry1,
 | |
|                  const WriteBatchIndexEntry* entry2) const;
 | |
| 
 | |
|   int CompareKey(uint32_t column_family, const Slice& key1,
 | |
|                  const Slice& key2) const;
 | |
| 
 | |
|   void SetComparatorForCF(uint32_t column_family_id,
 | |
|                           const Comparator* comparator) {
 | |
|     cf_comparator_map_[column_family_id] = comparator;
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   const Comparator* default_comparator_;
 | |
|   std::unordered_map<uint32_t, const Comparator*> cf_comparator_map_;
 | |
|   const ReadableWriteBatch* write_batch_;
 | |
| };
 | |
| 
 | |
| typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>
 | |
|     WriteBatchEntrySkipList;
 | |
| 
 | |
| class WBWIIteratorImpl : public WBWIIterator {
 | |
|  public:
 | |
|   WBWIIteratorImpl(uint32_t column_family_id,
 | |
|                    WriteBatchEntrySkipList* skip_list,
 | |
|                    const ReadableWriteBatch* write_batch)
 | |
|       : column_family_id_(column_family_id),
 | |
|         skip_list_iter_(skip_list),
 | |
|         write_batch_(write_batch),
 | |
|         valid_(false) {}
 | |
| 
 | |
|   virtual ~WBWIIteratorImpl() {}
 | |
| 
 | |
|   virtual bool Valid() const override { return valid_; }
 | |
| 
 | |
|   virtual void SeekToFirst() {
 | |
|     valid_ = true;
 | |
|     WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
 | |
|                                       column_family_id_);
 | |
|     skip_list_iter_.Seek(&search_entry);
 | |
|     ReadEntry();
 | |
|   }
 | |
| 
 | |
|   virtual void SeekToLast() {
 | |
|     valid_ = true;
 | |
|     WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
 | |
|                                       column_family_id_ + 1);
 | |
|     skip_list_iter_.Seek(&search_entry);
 | |
|     if (!skip_list_iter_.Valid()) {
 | |
|       skip_list_iter_.SeekToLast();
 | |
|     } else {
 | |
|       skip_list_iter_.Prev();
 | |
|     }
 | |
|     ReadEntry();
 | |
|   }
 | |
| 
 | |
|   virtual void Seek(const Slice& key) override {
 | |
|     valid_ = true;
 | |
|     WriteBatchIndexEntry search_entry(&key, column_family_id_);
 | |
|     skip_list_iter_.Seek(&search_entry);
 | |
|     ReadEntry();
 | |
|   }
 | |
| 
 | |
|   virtual void Next() override {
 | |
|     skip_list_iter_.Next();
 | |
|     ReadEntry();
 | |
|   }
 | |
| 
 | |
|   virtual void Prev() override {
 | |
|     skip_list_iter_.Prev();
 | |
|     ReadEntry();
 | |
|   }
 | |
| 
 | |
|   virtual const WriteEntry& Entry() const override { return current_; }
 | |
| 
 | |
|   virtual Status status() const override { return status_; }
 | |
| 
 | |
|   const WriteBatchIndexEntry* GetRawEntry() const {
 | |
|     return skip_list_iter_.key();
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   uint32_t column_family_id_;
 | |
|   WriteBatchEntrySkipList::Iterator skip_list_iter_;
 | |
|   const ReadableWriteBatch* write_batch_;
 | |
|   Status status_;
 | |
|   bool valid_;
 | |
|   WriteEntry current_;
 | |
| 
 | |
|   void ReadEntry() {
 | |
|     if (!status_.ok() || !skip_list_iter_.Valid()) {
 | |
|       valid_ = false;
 | |
|       return;
 | |
|     }
 | |
|     const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
 | |
|     if (iter_entry == nullptr ||
 | |
|         iter_entry->column_family != column_family_id_) {
 | |
|       valid_ = false;
 | |
|       return;
 | |
|     }
 | |
|     Slice blob;
 | |
|     status_ = write_batch_->GetEntryFromDataOffset(
 | |
|         iter_entry->offset, ¤t_.type, ¤t_.key, ¤t_.value,
 | |
|         &blob);
 | |
|     if (!status_.ok()) {
 | |
|       valid_ = false;
 | |
|     } else if (current_.type != kPutRecord && current_.type != kDeleteRecord &&
 | |
|                current_.type != kMergeRecord) {
 | |
|       valid_ = false;
 | |
|       status_ = Status::Corruption("write batch index is corrupted");
 | |
|     }
 | |
|   }
 | |
| };
 | |
| 
 | |
| struct WriteBatchWithIndex::Rep {
 | |
|   Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
 | |
|       bool overwrite_key = false)
 | |
|       : write_batch(reserved_bytes),
 | |
|         comparator(index_comparator, &write_batch),
 | |
|         skip_list(comparator, &arena),
 | |
|         overwrite_key(overwrite_key),
 | |
|         last_entry_offset(0) {}
 | |
|   ReadableWriteBatch write_batch;
 | |
|   WriteBatchEntryComparator comparator;
 | |
|   Arena arena;
 | |
|   WriteBatchEntrySkipList skip_list;
 | |
|   bool overwrite_key;
 | |
|   size_t last_entry_offset;
 | |
| 
 | |
|   // Remember current offset of internal write batch, which is used as
 | |
|   // the starting offset of the next record.
 | |
|   void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); }
 | |
| 
 | |
|   // In overwrite mode, find the existing entry for the same key and update it
 | |
|   // to point to the current entry.
 | |
|   // Return true if the key is found and updated.
 | |
|   bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key);
 | |
|   bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key);
 | |
| 
 | |
|   // Add the recent entry to the update.
 | |
|   // In overwrite mode, if key already exists in the index, update it.
 | |
|   void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key);
 | |
|   void AddOrUpdateIndex(const Slice& key);
 | |
| 
 | |
|   // Allocate an index entry pointing to the last entry in the write batch and
 | |
|   // put it to skip list.
 | |
|   void AddNewEntry(uint32_t column_family_id);
 | |
| };
 | |
| 
 | |
| bool WriteBatchWithIndex::Rep::UpdateExistingEntry(
 | |
|     ColumnFamilyHandle* column_family, const Slice& key) {
 | |
|   uint32_t cf_id = GetColumnFamilyID(column_family);
 | |
|   return UpdateExistingEntryWithCfId(cf_id, key);
 | |
| }
 | |
| 
 | |
| bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
 | |
|     uint32_t column_family_id, const Slice& key) {
 | |
|   if (!overwrite_key) {
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|   WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch);
 | |
|   iter.Seek(key);
 | |
|   if (!iter.Valid()) {
 | |
|     return false;
 | |
|   }
 | |
|   if (comparator.CompareKey(column_family_id, key, iter.Entry().key) != 0) {
 | |
|     return false;
 | |
|   }
 | |
|   WriteBatchIndexEntry* non_const_entry =
 | |
|       const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry());
 | |
|   non_const_entry->offset = last_entry_offset;
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| void WriteBatchWithIndex::Rep::AddOrUpdateIndex(
 | |
|     ColumnFamilyHandle* column_family, const Slice& key) {
 | |
|   if (!UpdateExistingEntry(column_family, key)) {
 | |
|     uint32_t cf_id = GetColumnFamilyID(column_family);
 | |
|     const auto* cf_cmp = GetColumnFamilyUserComparator(column_family);
 | |
|     if (cf_cmp != nullptr) {
 | |
|       comparator.SetComparatorForCF(cf_id, cf_cmp);
 | |
|     }
 | |
|     AddNewEntry(cf_id);
 | |
|   }
 | |
| }
 | |
| 
 | |
| void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) {
 | |
|   if (!UpdateExistingEntryWithCfId(0, key)) {
 | |
|     AddNewEntry(0);
 | |
|   }
 | |
| }
 | |
| 
 | |
| void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
 | |
|     auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
 | |
|     auto* index_entry =
 | |
|         new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id);
 | |
|     skip_list.Insert(index_entry);
 | |
|   }
 | |
| 
 | |
| Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
 | |
|                                                   WriteType* type, Slice* Key,
 | |
|                                                   Slice* value,
 | |
|                                                   Slice* blob) const {
 | |
|   if (type == nullptr || Key == nullptr || value == nullptr ||
 | |
|       blob == nullptr) {
 | |
|     return Status::InvalidArgument("Output parameters cannot be null");
 | |
|   }
 | |
| 
 | |
|   if (data_offset >= GetDataSize()) {
 | |
|     return Status::InvalidArgument("data offset exceed write batch size");
 | |
|   }
 | |
|   Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset);
 | |
|   char tag;
 | |
|   uint32_t column_family;
 | |
|   Status s =
 | |
|       ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, blob);
 | |
| 
 | |
|   switch (tag) {
 | |
|     case kTypeColumnFamilyValue:
 | |
|     case kTypeValue:
 | |
|       *type = kPutRecord;
 | |
|       break;
 | |
|     case kTypeColumnFamilyDeletion:
 | |
|     case kTypeDeletion:
 | |
|       *type = kDeleteRecord;
 | |
|       break;
 | |
|     case kTypeColumnFamilyMerge:
 | |
|     case kTypeMerge:
 | |
|       *type = kMergeRecord;
 | |
|       break;
 | |
|     case kTypeLogData:
 | |
|       *type = kLogDataRecord;
 | |
|       break;
 | |
|     default:
 | |
|       return Status::Corruption("unknown WriteBatch tag");
 | |
|   }
 | |
|   return Status::OK();
 | |
| }
 | |
| 
 | |
| WriteBatchWithIndex::WriteBatchWithIndex(
 | |
|     const Comparator* default_index_comparator, size_t reserved_bytes,
 | |
|     bool overwrite_key)
 | |
|     : rep(new Rep(default_index_comparator, reserved_bytes, overwrite_key)) {}
 | |
| 
 | |
| WriteBatchWithIndex::~WriteBatchWithIndex() { delete rep; }
 | |
| 
 | |
| WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
 | |
| 
 | |
| WBWIIterator* WriteBatchWithIndex::NewIterator() {
 | |
|   return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch);
 | |
| }
 | |
| 
 | |
| WBWIIterator* WriteBatchWithIndex::NewIterator(
 | |
|     ColumnFamilyHandle* column_family) {
 | |
|   return new WBWIIteratorImpl(GetColumnFamilyID(column_family),
 | |
|                               &(rep->skip_list), &rep->write_batch);
 | |
| }
 | |
| 
 | |
| Iterator* WriteBatchWithIndex::NewIteratorWithBase(
 | |
|     ColumnFamilyHandle* column_family, Iterator* base_iterator) {
 | |
|   if (rep->overwrite_key == false) {
 | |
|     assert(false);
 | |
|     return nullptr;
 | |
|   }
 | |
|   return new BaseDeltaIterator(base_iterator, NewIterator(column_family),
 | |
|                                GetColumnFamilyUserComparator(column_family));
 | |
| }
 | |
| 
 | |
| void WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
 | |
|                               const Slice& key, const Slice& value) {
 | |
|   rep->SetLastEntryOffset();
 | |
|   rep->write_batch.Put(column_family, key, value);
 | |
|   rep->AddOrUpdateIndex(column_family, key);
 | |
| }
 | |
| 
 | |
| void WriteBatchWithIndex::Put(const Slice& key, const Slice& value) {
 | |
|   rep->SetLastEntryOffset();
 | |
|   rep->write_batch.Put(key, value);
 | |
|   rep->AddOrUpdateIndex(key);
 | |
| }
 | |
| 
 | |
| void WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
 | |
|                                 const Slice& key, const Slice& value) {
 | |
|   rep->SetLastEntryOffset();
 | |
|   rep->write_batch.Merge(column_family, key, value);
 | |
|   rep->AddOrUpdateIndex(column_family, key);
 | |
| }
 | |
| 
 | |
| void WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
 | |
|   rep->SetLastEntryOffset();
 | |
|   rep->write_batch.Merge(key, value);
 | |
|   rep->AddOrUpdateIndex(key);
 | |
| }
 | |
| 
 | |
| void WriteBatchWithIndex::PutLogData(const Slice& blob) {
 | |
|   rep->write_batch.PutLogData(blob);
 | |
| }
 | |
| 
 | |
| void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
 | |
|                                  const Slice& key) {
 | |
|   rep->SetLastEntryOffset();
 | |
|   rep->write_batch.Delete(column_family, key);
 | |
|   rep->AddOrUpdateIndex(column_family, key);
 | |
| }
 | |
| 
 | |
| void WriteBatchWithIndex::Delete(const Slice& key) {
 | |
|   rep->SetLastEntryOffset();
 | |
|   rep->write_batch.Delete(key);
 | |
|   rep->AddOrUpdateIndex(key);
 | |
| }
 | |
| 
 | |
| int WriteBatchEntryComparator::operator()(
 | |
|     const WriteBatchIndexEntry* entry1,
 | |
|     const WriteBatchIndexEntry* entry2) const {
 | |
|   if (entry1->column_family > entry2->column_family) {
 | |
|     return 1;
 | |
|   } else if (entry1->column_family < entry2->column_family) {
 | |
|     return -1;
 | |
|   }
 | |
| 
 | |
|   if (entry1->offset == WriteBatchIndexEntry::kFlagMin) {
 | |
|     return -1;
 | |
|   } else if (entry2->offset == WriteBatchIndexEntry::kFlagMin) {
 | |
|     return 1;
 | |
|   }
 | |
| 
 | |
|   Status s;
 | |
|   Slice key1, key2;
 | |
|   if (entry1->search_key == nullptr) {
 | |
|     Slice value, blob;
 | |
|     WriteType write_type;
 | |
|     s = write_batch_->GetEntryFromDataOffset(entry1->offset, &write_type, &key1,
 | |
|                                              &value, &blob);
 | |
|     if (!s.ok()) {
 | |
|       return 1;
 | |
|     }
 | |
|   } else {
 | |
|     key1 = *(entry1->search_key);
 | |
|   }
 | |
|   if (entry2->search_key == nullptr) {
 | |
|     Slice value, blob;
 | |
|     WriteType write_type;
 | |
|     s = write_batch_->GetEntryFromDataOffset(entry2->offset, &write_type, &key2,
 | |
|                                              &value, &blob);
 | |
|     if (!s.ok()) {
 | |
|       return -1;
 | |
|     }
 | |
|   } else {
 | |
|     key2 = *(entry2->search_key);
 | |
|   }
 | |
| 
 | |
|   int cmp = CompareKey(entry1->column_family, key1, key2);
 | |
|   if (cmp != 0) {
 | |
|     return cmp;
 | |
|   } else if (entry1->offset > entry2->offset) {
 | |
|     return 1;
 | |
|   } else if (entry1->offset < entry2->offset) {
 | |
|     return -1;
 | |
|   }
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| int WriteBatchEntryComparator::CompareKey(uint32_t column_family,
 | |
|                                           const Slice& key1,
 | |
|                                           const Slice& key2) const {
 | |
|   auto comparator_for_cf = cf_comparator_map_.find(column_family);
 | |
|   if (comparator_for_cf != cf_comparator_map_.end()) {
 | |
|     return comparator_for_cf->second->Compare(key1, key2);
 | |
|   } else {
 | |
|     return default_comparator_->Compare(key1, key2);
 | |
|   }
 | |
| }
 | |
| 
 | |
| }  // namespace rocksdb
 | |
| 
 |