fork of https://github.com/oxigraph/rocksdb and https://github.com/facebook/rocksdb for nextgraph and oxigraph
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
288 lines
9.5 KiB
288 lines
9.5 KiB
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
|
|
|
|
#include "db/column_family.h"
|
|
#include "db/merge_context.h"
|
|
#include "db/merge_helper.h"
|
|
#include "rocksdb/comparator.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/utilities/write_batch_with_index.h"
|
|
#include "util/coding.h"
|
|
#include "util/string_util.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
class Env;
|
|
class Logger;
|
|
class Statistics;
|
|
|
|
Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
|
|
WriteType* type, Slice* Key,
|
|
Slice* value, Slice* blob,
|
|
Slice* xid) const {
|
|
if (type == nullptr || Key == nullptr || value == nullptr ||
|
|
blob == nullptr || xid == nullptr) {
|
|
return Status::InvalidArgument("Output parameters cannot be null");
|
|
}
|
|
|
|
if (data_offset == GetDataSize()) {
|
|
// reached end of batch.
|
|
return Status::NotFound();
|
|
}
|
|
|
|
if (data_offset > GetDataSize()) {
|
|
return Status::InvalidArgument("data offset exceed write batch size");
|
|
}
|
|
Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset);
|
|
char tag;
|
|
uint32_t column_family;
|
|
Status s = ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value,
|
|
blob, xid);
|
|
|
|
switch (tag) {
|
|
case kTypeColumnFamilyValue:
|
|
case kTypeValue:
|
|
*type = kPutRecord;
|
|
break;
|
|
case kTypeColumnFamilyDeletion:
|
|
case kTypeDeletion:
|
|
*type = kDeleteRecord;
|
|
break;
|
|
case kTypeColumnFamilySingleDeletion:
|
|
case kTypeSingleDeletion:
|
|
*type = kSingleDeleteRecord;
|
|
break;
|
|
case kTypeColumnFamilyRangeDeletion:
|
|
case kTypeRangeDeletion:
|
|
*type = kDeleteRangeRecord;
|
|
break;
|
|
case kTypeColumnFamilyMerge:
|
|
case kTypeMerge:
|
|
*type = kMergeRecord;
|
|
break;
|
|
case kTypeLogData:
|
|
*type = kLogDataRecord;
|
|
break;
|
|
case kTypeNoop:
|
|
case kTypeBeginPrepareXID:
|
|
case kTypeBeginPersistedPrepareXID:
|
|
case kTypeBeginUnprepareXID:
|
|
case kTypeEndPrepareXID:
|
|
case kTypeCommitXID:
|
|
case kTypeRollbackXID:
|
|
*type = kXIDRecord;
|
|
break;
|
|
default:
|
|
return Status::Corruption("unknown WriteBatch tag ",
|
|
ToString(static_cast<unsigned int>(tag)));
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
// If both of `entry1` and `entry2` point to real entry in write batch, we
|
|
// compare the entries as following:
|
|
// 1. first compare the column family, the one with larger CF will be larger;
|
|
// 2. Inside the same CF, we first decode the entry to find the key of the entry
|
|
// and the entry with larger key will be larger;
|
|
// 3. If two entries are of the same CF and offset, the one with larger offset
|
|
// will be larger.
|
|
// Some times either `entry1` or `entry2` is dummy entry, which is actually
|
|
// a search key. In this case, in step 2, we don't go ahead and decode the
|
|
// entry but use the value in WriteBatchIndexEntry::search_key.
|
|
// One special case is WriteBatchIndexEntry::key_size is kFlagMinInCf.
|
|
// This indicate that we are going to seek to the first of the column family.
|
|
// Once we see this, this entry will be smaller than all the real entries of
|
|
// the column family.
|
|
int WriteBatchEntryComparator::operator()(
|
|
const WriteBatchIndexEntry* entry1,
|
|
const WriteBatchIndexEntry* entry2) const {
|
|
if (entry1->column_family > entry2->column_family) {
|
|
return 1;
|
|
} else if (entry1->column_family < entry2->column_family) {
|
|
return -1;
|
|
}
|
|
|
|
// Deal with special case of seeking to the beginning of a column family
|
|
if (entry1->is_min_in_cf()) {
|
|
return -1;
|
|
} else if (entry2->is_min_in_cf()) {
|
|
return 1;
|
|
}
|
|
|
|
Slice key1, key2;
|
|
if (entry1->search_key == nullptr) {
|
|
key1 = Slice(write_batch_->Data().data() + entry1->key_offset,
|
|
entry1->key_size);
|
|
} else {
|
|
key1 = *(entry1->search_key);
|
|
}
|
|
if (entry2->search_key == nullptr) {
|
|
key2 = Slice(write_batch_->Data().data() + entry2->key_offset,
|
|
entry2->key_size);
|
|
} else {
|
|
key2 = *(entry2->search_key);
|
|
}
|
|
|
|
int cmp = CompareKey(entry1->column_family, key1, key2);
|
|
if (cmp != 0) {
|
|
return cmp;
|
|
} else if (entry1->offset > entry2->offset) {
|
|
return 1;
|
|
} else if (entry1->offset < entry2->offset) {
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int WriteBatchEntryComparator::CompareKey(uint32_t column_family,
|
|
const Slice& key1,
|
|
const Slice& key2) const {
|
|
if (column_family < cf_comparators_.size() &&
|
|
cf_comparators_[column_family] != nullptr) {
|
|
return cf_comparators_[column_family]->Compare(key1, key2);
|
|
} else {
|
|
return default_comparator_->Compare(key1, key2);
|
|
}
|
|
}
|
|
|
|
WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
|
|
const ImmutableDBOptions& immuable_db_options, WriteBatchWithIndex* batch,
|
|
ColumnFamilyHandle* column_family, const Slice& key,
|
|
MergeContext* merge_context, WriteBatchEntryComparator* cmp,
|
|
std::string* value, bool overwrite_key, Status* s) {
|
|
uint32_t cf_id = GetColumnFamilyID(column_family);
|
|
*s = Status::OK();
|
|
WriteBatchWithIndexInternal::Result result =
|
|
WriteBatchWithIndexInternal::Result::kNotFound;
|
|
|
|
std::unique_ptr<WBWIIterator> iter =
|
|
std::unique_ptr<WBWIIterator>(batch->NewIterator(column_family));
|
|
|
|
// We want to iterate in the reverse order that the writes were added to the
|
|
// batch. Since we don't have a reverse iterator, we must seek past the end.
|
|
// TODO(agiardullo): consider adding support for reverse iteration
|
|
iter->Seek(key);
|
|
while (iter->Valid()) {
|
|
const WriteEntry entry = iter->Entry();
|
|
if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
|
|
break;
|
|
}
|
|
|
|
iter->Next();
|
|
}
|
|
|
|
if (!(*s).ok()) {
|
|
return WriteBatchWithIndexInternal::Result::kError;
|
|
}
|
|
|
|
if (!iter->Valid()) {
|
|
// Read past end of results. Reposition on last result.
|
|
iter->SeekToLast();
|
|
} else {
|
|
iter->Prev();
|
|
}
|
|
|
|
Slice entry_value;
|
|
while (iter->Valid()) {
|
|
const WriteEntry entry = iter->Entry();
|
|
if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
|
|
// Unexpected error or we've reached a different next key
|
|
break;
|
|
}
|
|
|
|
switch (entry.type) {
|
|
case kPutRecord: {
|
|
result = WriteBatchWithIndexInternal::Result::kFound;
|
|
entry_value = entry.value;
|
|
break;
|
|
}
|
|
case kMergeRecord: {
|
|
result = WriteBatchWithIndexInternal::Result::kMergeInProgress;
|
|
merge_context->PushOperand(entry.value);
|
|
break;
|
|
}
|
|
case kDeleteRecord:
|
|
case kSingleDeleteRecord: {
|
|
result = WriteBatchWithIndexInternal::Result::kDeleted;
|
|
break;
|
|
}
|
|
case kLogDataRecord:
|
|
case kXIDRecord: {
|
|
// ignore
|
|
break;
|
|
}
|
|
default: {
|
|
result = WriteBatchWithIndexInternal::Result::kError;
|
|
(*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:",
|
|
ToString(entry.type));
|
|
break;
|
|
}
|
|
}
|
|
if (result == WriteBatchWithIndexInternal::Result::kFound ||
|
|
result == WriteBatchWithIndexInternal::Result::kDeleted ||
|
|
result == WriteBatchWithIndexInternal::Result::kError) {
|
|
// We can stop iterating once we find a PUT or DELETE
|
|
break;
|
|
}
|
|
if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
|
|
overwrite_key == true) {
|
|
// Since we've overwritten keys, we do not know what other operations are
|
|
// in this batch for this key, so we cannot do a Merge to compute the
|
|
// result. Instead, we will simply return MergeInProgress.
|
|
break;
|
|
}
|
|
|
|
iter->Prev();
|
|
}
|
|
|
|
if ((*s).ok()) {
|
|
if (result == WriteBatchWithIndexInternal::Result::kFound ||
|
|
result == WriteBatchWithIndexInternal::Result::kDeleted) {
|
|
// Found a Put or Delete. Merge if necessary.
|
|
if (merge_context->GetNumOperands() > 0) {
|
|
const MergeOperator* merge_operator;
|
|
|
|
if (column_family != nullptr) {
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
merge_operator = cfh->cfd()->ioptions()->merge_operator;
|
|
} else {
|
|
*s = Status::InvalidArgument("Must provide a column_family");
|
|
result = WriteBatchWithIndexInternal::Result::kError;
|
|
return result;
|
|
}
|
|
Statistics* statistics = immuable_db_options.statistics.get();
|
|
Env* env = immuable_db_options.env;
|
|
Logger* logger = immuable_db_options.info_log.get();
|
|
|
|
if (merge_operator) {
|
|
*s = MergeHelper::TimedFullMerge(merge_operator, key, &entry_value,
|
|
merge_context->GetOperands(), value,
|
|
logger, statistics, env);
|
|
} else {
|
|
*s = Status::InvalidArgument("Options::merge_operator must be set");
|
|
}
|
|
if ((*s).ok()) {
|
|
result = WriteBatchWithIndexInternal::Result::kFound;
|
|
} else {
|
|
result = WriteBatchWithIndexInternal::Result::kError;
|
|
}
|
|
} else { // nothing to merge
|
|
if (result == WriteBatchWithIndexInternal::Result::kFound) { // PUT
|
|
value->assign(entry_value.data(), entry_value.size());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
} // namespace rocksdb
|
|
|
|
#endif // !ROCKSDB_LITE
|
|
|