diff --git a/db/dbformat.h b/db/dbformat.h index 3a9682d1d..51f5e4143 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -465,6 +465,12 @@ class InternalKeySliceTransform : public SliceTransform { const SliceTransform* const transform_; }; +// Read the key of a record from a write batch. +// if this record represent the default column family then cf_record +// must be passed as false, otherwise it must be passed as true. +extern bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, + bool cf_record); + // Read record from a write batch piece from input. // tag, column_family, key, value and blob are return values. Callers own the // Slice they point to. diff --git a/db/write_batch.cc b/db/write_batch.cc index 73532a2da..8a54432bb 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -94,7 +94,7 @@ struct SavePoints { WriteBatch::WriteBatch(size_t reserved_bytes) : save_points_(nullptr), content_flags_(0), rep_() { - rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ? + rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ? reserved_bytes : WriteBatchInternal::kHeader); rep_.resize(WriteBatchInternal::kHeader); } @@ -192,6 +192,23 @@ bool WriteBatch::HasMerge() const { return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0; } +bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) { + assert(input != nullptr && key != nullptr); + // Skip tag byte + input->remove_prefix(1); + + if (cf_record) { + // Skip column_family bytes + uint32_t cf; + if (!GetVarint32(input, &cf)) { + return false; + } + } + + // Extract key + return GetLengthPrefixedSlice(input, key); +} + Status ReadRecordFromWriteBatch(Slice* input, char* tag, uint32_t* column_family, Slice* key, Slice* value, Slice* blob) { @@ -328,8 +345,8 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { EncodeFixed64(&b->rep_[0], seq); } -size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) { - return WriteBatchInternal::kHeader; +size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) { + return WriteBatchInternal::kHeader; } void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, @@ -841,7 +858,7 @@ void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { SetCount(dst, Count(dst) + Count(src)); assert(src->rep_.size() >= WriteBatchInternal::kHeader); - dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, + dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src->rep_.size() - WriteBatchInternal::kHeader); dst->content_flags_.store( dst->content_flags_.load(std::memory_order_relaxed) | diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index e57c95c42..67a369c8b 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -311,13 +311,13 @@ class WBWIIteratorImpl : public WBWIIterator { virtual void SeekToFirst() override { WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin, - column_family_id_); + column_family_id_, 0, 0); skip_list_iter_.Seek(&search_entry); } virtual void SeekToLast() override { WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin, - column_family_id_ + 1); + column_family_id_ + 1, 0, 0); skip_list_iter_.Seek(&search_entry); if (!skip_list_iter_.Valid()) { skip_list_iter_.SeekToLast(); @@ -454,9 +454,19 @@ void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) { } void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { + const std::string& wb_data = write_batch.Data(); + Slice entry_ptr = Slice(wb_data.data() + last_entry_offset, + wb_data.size() - last_entry_offset); + // Extract key + Slice key; + bool success = + ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0); + assert(success); + auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry)); auto* index_entry = - new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id); + new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id, + key.data() - wb_data.data(), key.size()); skip_list.Insert(index_entry); } diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index 7b1a6dd27..89114f02d 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -86,27 +86,16 @@ int WriteBatchEntryComparator::operator()( return 1; } - Status s; Slice key1, key2; if (entry1->search_key == nullptr) { - Slice value, blob; - WriteType write_type; - s = write_batch_->GetEntryFromDataOffset(entry1->offset, &write_type, &key1, - &value, &blob); - if (!s.ok()) { - return 1; - } + key1 = Slice(write_batch_->Data().data() + entry1->key_offset, + entry1->key_size); } else { key1 = *(entry1->search_key); } if (entry2->search_key == nullptr) { - Slice value, blob; - WriteType write_type; - s = write_batch_->GetEntryFromDataOffset(entry2->offset, &write_type, &key2, - &value, &blob); - if (!s.ok()) { - return -1; - } + key2 = Slice(write_batch_->Data().data() + entry2->key_offset, + entry2->key_size); } else { key2 = *(entry2->search_key); } @@ -125,9 +114,9 @@ int WriteBatchEntryComparator::operator()( int WriteBatchEntryComparator::CompareKey(uint32_t column_family, const Slice& key1, const Slice& key2) const { - auto comparator_for_cf = cf_comparator_map_.find(column_family); - if (comparator_for_cf != cf_comparator_map_.end()) { - return comparator_for_cf->second->Compare(key1, key2); + if (column_family < cf_comparators_.size() && + cf_comparators_[column_family] != nullptr) { + return cf_comparators_[column_family]->Compare(key1, key2); } else { return default_comparator_->Compare(key1, key2); } diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.h b/utilities/write_batch_with_index/write_batch_with_index_internal.h index ec4da19e4..b45dcadf8 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.h +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.h @@ -8,7 +8,7 @@ #include #include -#include +#include #include "rocksdb/comparator.h" #include "rocksdb/iterator.h" @@ -24,17 +24,28 @@ struct Options; // Key used by skip list, as the binary searchable index of WriteBatchWithIndex. struct WriteBatchIndexEntry { - WriteBatchIndexEntry(size_t o, uint32_t c) - : offset(o), column_family(c), search_key(nullptr) {} + WriteBatchIndexEntry(size_t o, uint32_t c, size_t ko, size_t ksz) + : offset(o), + column_family(c), + key_offset(ko), + key_size(ksz), + search_key(nullptr) {} WriteBatchIndexEntry(const Slice* sk, uint32_t c) - : offset(0), column_family(c), search_key(sk) {} + : offset(0), + column_family(c), + key_offset(0), + key_size(0), + search_key(sk) {} // If this flag appears in the offset, it indicates a key that is smaller // than any other entry for the same column family static const size_t kFlagMin = port::kMaxSizet; size_t offset; // offset of an entry in write batch's string buffer. - uint32_t column_family; // column family of the entry + uint32_t column_family; // column family of the entry. + size_t key_offset; // offset of the key in write batch's string buffer. + size_t key_size; // size of the key. + const Slice* search_key; // if not null, instead of reading keys from // write batch, use it to compare. This is used // for lookup key. @@ -65,14 +76,17 @@ class WriteBatchEntryComparator { void SetComparatorForCF(uint32_t column_family_id, const Comparator* comparator) { - cf_comparator_map_[column_family_id] = comparator; + if (column_family_id >= cf_comparators_.size()) { + cf_comparators_.resize(column_family_id + 1, nullptr); + } + cf_comparators_[column_family_id] = comparator; } const Comparator* default_comparator() { return default_comparator_; } private: const Comparator* default_comparator_; - std::unordered_map cf_comparator_map_; + std::vector cf_comparators_; const ReadableWriteBatch* write_batch_; };