Separate the handling of value types in SaveValue (#10840)

Summary:
Currently, the code in `SaveValue` that handles `kTypeValue` and
`kTypeBlobIndex` (and more recently, `kTypeWideColumnEntity`) is
mostly shared. This made sense originally; however, by now the
handling of these three value types has diverged significantly. The
patch makes the logic cleaner and also eliminates quite a bit of branching
by giving each value type its own `case` and removing a fall-through.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10840

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D40568420

Pulled By: ltamasi

fbshipit-source-id: 2e614606afd1c3d9c76d9b5f1efa0959fc174103
main
Levi Tamasi 2 years ago committed by Facebook GitHub Bot
parent 2564215e35
commit 8dd4bf6cff
  1. 146
      db/memtable.cc

@ -999,77 +999,147 @@ static bool SaveValue(void* arg, const char* entry) {
type = kTypeRangeDeletion; type = kTypeRangeDeletion;
} }
switch (type) { switch (type) {
case kTypeBlobIndex: case kTypeBlobIndex: {
case kTypeWideColumnEntity: if (!s->do_merge) {
*(s->status) = Status::NotSupported(
"GetMergeOperands not supported by stacked BlobDB");
*(s->found_final_value) = true;
return false;
}
if (*(s->merge_in_progress)) { if (*(s->merge_in_progress)) {
*(s->status) = Status::NotSupported("Merge operator not supported"); *(s->status) = Status::NotSupported(
} else if (!s->do_merge) { "Merge operator not supported by stacked BlobDB");
*(s->status) = Status::NotSupported("GetMergeOperands not supported"); *(s->found_final_value) = true;
} else if (type == kTypeBlobIndex) { return false;
}
if (s->is_blob_index == nullptr) { if (s->is_blob_index == nullptr) {
ROCKS_LOG_ERROR(s->logger, "Encounter unexpected blob index."); ROCKS_LOG_ERROR(s->logger, "Encountered unexpected blob index.");
*(s->status) = Status::NotSupported( *(s->status) = Status::NotSupported(
"Encounter unsupported blob value. Please open DB with " "Encountered unexpected blob index. Please open DB with "
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead."); "ROCKSDB_NAMESPACE::blob_db::BlobDB.");
*(s->found_final_value) = true;
return false;
} }
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock();
}
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->status) = Status::OK();
if (s->value) {
s->value->assign(v.data(), v.size());
} else if (s->columns) {
s->columns->SetPlainValue(v);
}
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadUnlock();
} }
if (!s->status->ok()) {
*(s->found_final_value) = true; *(s->found_final_value) = true;
*(s->is_blob_index) = true;
return false; return false;
} }
FALLTHROUGH_INTENDED;
case kTypeValue: { case kTypeValue: {
if (s->inplace_update_support) { if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock(); s->mem->GetLock(s->key->user_key())->ReadLock();
} }
Slice v = GetLengthPrefixedSlice(key_ptr + key_length); Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->status) = Status::OK(); *(s->status) = Status::OK();
if (*(s->merge_in_progress)) {
if (s->do_merge) { if (!s->do_merge) {
if (s->value != nullptr) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &v,
merge_context->GetOperands(), s->value, s->logger,
s->statistics, s->clock, nullptr /* result_operand */, true);
}
} else {
// Preserve the value with the goal of returning it as part of // Preserve the value with the goal of returning it as part of
// raw merge operands to the user // raw merge operands to the user
merge_context->PushOperand( merge_context->PushOperand(
v, s->inplace_update_support == false /* operand_pinned */); v, s->inplace_update_support == false /* operand_pinned */);
} else if (*(s->merge_in_progress)) {
assert(s->do_merge);
if (s->value || s->columns) {
std::string result;
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &v,
merge_context->GetOperands(), &result, s->logger, s->statistics,
s->clock, nullptr /* result_operand */, true);
if (s->status->ok()) {
if (s->value) {
*(s->value) = std::move(result);
} else {
assert(s->columns);
s->columns->SetPlainValue(result);
}
}
} }
} else if (!s->do_merge) {
// Preserve the value with the goal of returning it as part of
// raw merge operands to the user
merge_context->PushOperand(
v, s->inplace_update_support == false /* operand_pinned */);
} else if (s->value) { } else if (s->value) {
if (type != kTypeWideColumnEntity) {
assert(type == kTypeValue || type == kTypeBlobIndex);
s->value->assign(v.data(), v.size()); s->value->assign(v.data(), v.size());
} else { } else if (s->columns) {
Slice value; s->columns->SetPlainValue(v);
*(s->status) = }
WideColumnSerialization::GetValueOfDefaultColumn(v, value);
if (s->status->ok()) { if (s->inplace_update_support) {
s->value->assign(value.data(), value.size()); s->mem->GetLock(s->key->user_key())->ReadUnlock();
}
*(s->found_final_value) = true;
if (s->is_blob_index != nullptr) {
*(s->is_blob_index) = false;
}
return false;
}
case kTypeWideColumnEntity: {
if (!s->do_merge) {
*(s->status) = Status::NotSupported(
"GetMergeOperands not supported for wide-column entities");
*(s->found_final_value) = true;
return false;
}
if (*(s->merge_in_progress)) {
*(s->status) = Status::NotSupported(
"Merge not supported for wide-column entities");
*(s->found_final_value) = true;
return false;
}
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock();
} }
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->status) = Status::OK();
if (s->value) {
Slice value_of_default;
*(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
v, value_of_default);
if (s->status->ok()) {
s->value->assign(value_of_default.data(), value_of_default.size());
} }
} else if (s->columns) { } else if (s->columns) {
if (type != kTypeWideColumnEntity) {
s->columns->SetPlainValue(v);
} else {
*(s->status) = s->columns->SetWideColumnValue(v); *(s->status) = s->columns->SetWideColumnValue(v);
} }
}
if (s->inplace_update_support) { if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadUnlock(); s->mem->GetLock(s->key->user_key())->ReadUnlock();
} }
*(s->found_final_value) = true; *(s->found_final_value) = true;
if (s->is_blob_index != nullptr) { if (s->is_blob_index != nullptr) {
*(s->is_blob_index) = (type == kTypeBlobIndex); *(s->is_blob_index) = false;
} }
return false; return false;

Loading…
Cancel
Save