Fix a bug where GetContext does not update READ_NUM_MERGE_OPERANDS (#10925)

Summary:
The patch fixes a bug where `GetContext::Merge` (and `MergeEntity`) does not update the ticker `READ_NUM_MERGE_OPERANDS` because it implicitly uses the default parameter value of `update_num_ops_stats=false` when calling `MergeHelper::TimedFullMerge`. Also, to prevent such issues going forward, the PR removes the default parameter values from the `TimedFullMerge` methods. In addition, it removes an unused/unnecessary parameter from `TimedFullMergeWithEntity`, and does some cleanup at the call sites of these methods.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10925

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D41096453

Pulled By: ltamasi

fbshipit-source-id: fc60646d32b4d516b8fe81e265c3f020a32fd7f8
main
Levi Tamasi 2 years ago committed by Facebook GitHub Bot
parent 75aca74017
commit fbd9077d66
  1. 1
      HISTORY.md
  2. 3
      db/db_iter.cc
  3. 11
      db/memtable.cc
  4. 18
      db/merge_helper.cc
  5. 12
      db/merge_helper.h
  6. 4
      db/version_set.cc
  7. 3
      db/write_batch.cc
  8. 5
      table/get_context.cc
  9. 17
      utilities/write_batch_with_index/write_batch_with_index_internal.cc

@ -7,6 +7,7 @@
* Fix FIFO compaction causing corruption of overlapping seqnos in L0 files due to ingesting files of overlapping seqnos with memtable's under `CompactionOptionsFIFO::allow_compaction=true` or `CompactionOptionsFIFO::age_for_warm>0` or `CompactRange()/CompactFiles()` is used. Before the fix, `force_consistency_checks=true` may catch the corruption before it's exposed to readers, in which case writes returning `Status::Corruption` would be expected. * Fix FIFO compaction causing corruption of overlapping seqnos in L0 files due to ingesting files of overlapping seqnos with memtable's under `CompactionOptionsFIFO::allow_compaction=true` or `CompactionOptionsFIFO::age_for_warm>0` or `CompactRange()/CompactFiles()` is used. Before the fix, `force_consistency_checks=true` may catch the corruption before it's exposed to readers, in which case writes returning `Status::Corruption` would be expected.
* Fix memory corruption error in scans if async_io is enabled. Memory corruption happened if there is IOError while reading the data leading to empty buffer and other buffer already in progress of async read goes again for reading. * Fix memory corruption error in scans if async_io is enabled. Memory corruption happened if there is IOError while reading the data leading to empty buffer and other buffer already in progress of async read goes again for reading.
* Fix failed memtable flush retry bug that could cause wrongly ordered updates, which would surface to writers as `Status::Corruption` in case of `force_consistency_checks=true` (default). It affects use cases that enable both parallel flush (`max_background_flushes > 1` or `max_background_jobs >= 8`) and non-default memtable count (`max_write_buffer_number > 2`). * Fix failed memtable flush retry bug that could cause wrongly ordered updates, which would surface to writers as `Status::Corruption` in case of `force_consistency_checks=true` (default). It affects use cases that enable both parallel flush (`max_background_flushes > 1` or `max_background_jobs >= 8`) and non-default memtable count (`max_write_buffer_number > 2`).
* Fixed an issue where the `READ_NUM_MERGE_OPERANDS` ticker was not updated when the base key-value or tombstone was read from an SST file.
### New Features ### New Features
* Add basic support for user-defined timestamp to Merge (#10819). * Add basic support for user-defined timestamp to Merge (#10819).

@ -1247,7 +1247,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
Status DBIter::Merge(const Slice* val, const Slice& user_key) { Status DBIter::Merge(const Slice* val, const Slice& user_key) {
Status s = MergeHelper::TimedFullMerge( Status s = MergeHelper::TimedFullMerge(
merge_operator_, user_key, val, merge_context_.GetOperands(), merge_operator_, user_key, val, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, clock_, &pinned_value_, true); &saved_value_, logger_, statistics_, clock_, &pinned_value_,
/* update_num_ops_stats */ true);
if (!s.ok()) { if (!s.ok()) {
valid_ = false; valid_ = false;
status_ = s; status_ = s;

@ -1069,7 +1069,8 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->status) = MergeHelper::TimedFullMerge( *(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &v, merge_operator, s->key->user_key(), &v,
merge_context->GetOperands(), s->value, s->columns, s->logger, merge_context->GetOperands(), s->value, s->columns, s->logger,
s->statistics, s->clock, nullptr /* result_operand */, true); s->statistics, s->clock, /* result_operand */ nullptr,
/* update_num_ops_stats */ true);
} }
} else if (s->value) { } else if (s->value) {
s->value->assign(v.data(), v.size()); s->value->assign(v.data(), v.size());
@ -1118,7 +1119,7 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->status) = MergeHelper::TimedFullMergeWithEntity( *(s->status) = MergeHelper::TimedFullMergeWithEntity(
merge_operator, s->key->user_key(), v, merge_operator, s->key->user_key(), v,
merge_context->GetOperands(), s->value, s->columns, s->logger, merge_context->GetOperands(), s->value, s->columns, s->logger,
s->statistics, s->clock, nullptr /* result_operand */, true); s->statistics, s->clock, /* update_num_ops_stats */ true);
} }
} else if (s->value) { } else if (s->value) {
Slice value_of_default; Slice value_of_default;
@ -1152,7 +1153,8 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->status) = MergeHelper::TimedFullMerge( *(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr, merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value, s->columns, s->logger, merge_context->GetOperands(), s->value, s->columns, s->logger,
s->statistics, s->clock, nullptr /* result_operand */, true); s->statistics, s->clock, /* result_operand */ nullptr,
/* update_num_ops_stats */ true);
} }
} else { } else {
*(s->status) = Status::NotFound(); *(s->status) = Status::NotFound();
@ -1181,7 +1183,8 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->status) = MergeHelper::TimedFullMerge( *(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr, merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value, s->columns, s->logger, merge_context->GetOperands(), s->value, s->columns, s->logger,
s->statistics, s->clock, nullptr /* result_operand */, true); s->statistics, s->clock, /* result_operand */ nullptr,
/* update_num_ops_stats */ true);
} }
*(s->found_final_value) = true; *(s->found_final_value) = true;

@ -146,7 +146,7 @@ Status MergeHelper::TimedFullMergeWithEntity(
const MergeOperator* merge_operator, const Slice& key, Slice base_entity, const MergeOperator* merge_operator, const Slice& key, Slice base_entity,
const std::vector<Slice>& operands, std::string* value, const std::vector<Slice>& operands, std::string* value,
PinnableWideColumns* columns, Logger* logger, Statistics* statistics, PinnableWideColumns* columns, Logger* logger, Statistics* statistics,
SystemClock* clock, Slice* result_operand, bool update_num_ops_stats) { SystemClock* clock, bool update_num_ops_stats) {
assert(value || columns); assert(value || columns);
assert(!value || !columns); assert(!value || !columns);
@ -171,6 +171,8 @@ Status MergeHelper::TimedFullMergeWithEntity(
std::string result; std::string result;
{ {
constexpr Slice* result_operand = nullptr;
const Status s = TimedFullMerge( const Status s = TimedFullMerge(
merge_operator, key, &value_of_default, operands, &result, logger, merge_operator, key, &value_of_default, operands, &result, logger,
statistics, clock, result_operand, update_num_ops_stats); statistics, clock, result_operand, update_num_ops_stats);
@ -380,9 +382,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
val_ptr = nullptr; val_ptr = nullptr;
} }
std::string merge_result; std::string merge_result;
s = TimedFullMerge(user_merge_operator_, ikey.user_key, val_ptr, s = TimedFullMerge(
merge_context_.GetOperands(), &merge_result, logger_, user_merge_operator_, ikey.user_key, val_ptr,
stats_, clock_); merge_context_.GetOperands(), &merge_result, logger_, stats_, clock_,
/* result_operand */ nullptr, /* update_num_ops_stats */ false);
// We store the result in keys_.back() and operands_.back() // We store the result in keys_.back() and operands_.back()
// if nothing went wrong (i.e.: no operand corruption on disk) // if nothing went wrong (i.e.: no operand corruption on disk)
@ -509,9 +512,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
assert(merge_context_.GetNumOperands() >= 1); assert(merge_context_.GetNumOperands() >= 1);
assert(merge_context_.GetNumOperands() == keys_.size()); assert(merge_context_.GetNumOperands() == keys_.size());
std::string merge_result; std::string merge_result;
s = TimedFullMerge(user_merge_operator_, orig_ikey.user_key, nullptr, s = TimedFullMerge(
merge_context_.GetOperands(), &merge_result, logger_, user_merge_operator_, orig_ikey.user_key, nullptr,
stats_, clock_); merge_context_.GetOperands(), &merge_result, logger_, stats_, clock_,
/* result_operand */ nullptr, /* update_num_ops_stats */ false);
if (s.ok()) { if (s.ok()) {
// The original key encountered // The original key encountered
// We are certain that keys_ is not empty here (see assertions couple of // We are certain that keys_ is not empty here (see assertions couple of

@ -54,24 +54,22 @@ class MergeHelper {
const std::vector<Slice>& operands, const std::vector<Slice>& operands,
std::string* result, Logger* logger, std::string* result, Logger* logger,
Statistics* statistics, SystemClock* clock, Statistics* statistics, SystemClock* clock,
Slice* result_operand = nullptr, Slice* result_operand,
bool update_num_ops_stats = false); bool update_num_ops_stats);
static Status TimedFullMerge(const MergeOperator* merge_operator, static Status TimedFullMerge(const MergeOperator* merge_operator,
const Slice& key, const Slice* base_value, const Slice& key, const Slice* base_value,
const std::vector<Slice>& operands, const std::vector<Slice>& operands,
std::string* value, PinnableWideColumns* columns, std::string* value, PinnableWideColumns* columns,
Logger* logger, Statistics* statistics, Logger* logger, Statistics* statistics,
SystemClock* clock, SystemClock* clock, Slice* result_operand,
Slice* result_operand = nullptr, bool update_num_ops_stats);
bool update_num_ops_stats = false);
static Status TimedFullMergeWithEntity( static Status TimedFullMergeWithEntity(
const MergeOperator* merge_operator, const Slice& key, Slice base_entity, const MergeOperator* merge_operator, const Slice& key, Slice base_entity,
const std::vector<Slice>& operands, std::string* value, const std::vector<Slice>& operands, std::string* value,
PinnableWideColumns* columns, Logger* logger, Statistics* statistics, PinnableWideColumns* columns, Logger* logger, Statistics* statistics,
SystemClock* clock, Slice* result_operand = nullptr, SystemClock* clock, bool update_num_ops_stats);
bool update_num_ops_stats = false);
// During compaction, merge entries until we hit // During compaction, merge entries until we hit
// - a corrupted key // - a corrupted key

@ -2389,7 +2389,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
*status = MergeHelper::TimedFullMerge( *status = MergeHelper::TimedFullMerge(
merge_operator_, user_key, nullptr, merge_context->GetOperands(), merge_operator_, user_key, nullptr, merge_context->GetOperands(),
str_value, columns, info_log_, db_statistics_, clock_, str_value, columns, info_log_, db_statistics_, clock_,
nullptr /* result_operand */, true); /* result_operand */ nullptr, /* update_num_ops_stats */ true);
if (status->ok()) { if (status->ok()) {
if (LIKELY(value != nullptr)) { if (LIKELY(value != nullptr)) {
value->PinSelf(); value->PinSelf();
@ -2630,7 +2630,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
*status = MergeHelper::TimedFullMerge( *status = MergeHelper::TimedFullMerge(
merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(), merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
str_value, info_log_, db_statistics_, clock_, str_value, info_log_, db_statistics_, clock_,
nullptr /* result_operand */, true); /* result_operand */ nullptr, /* update_num_ops_stats */ true);
if (LIKELY(iter->value != nullptr)) { if (LIKELY(iter->value != nullptr)) {
iter->value->PinSelf(); iter->value->PinSelf();
range->AddValueSize(iter->value->size()); range->AddValueSize(iter->value->size());

@ -2502,7 +2502,8 @@ class MemTableInserter : public WriteBatch::Handler {
Status merge_status = MergeHelper::TimedFullMerge( Status merge_status = MergeHelper::TimedFullMerge(
merge_operator, key, &get_value_slice, {value}, &new_value, merge_operator, key, &get_value_slice, {value}, &new_value,
moptions->info_log, moptions->statistics, moptions->info_log, moptions->statistics,
SystemClock::Default().get()); SystemClock::Default().get(), /* result_operand */ nullptr,
/* update_num_ops_stats */ false);
if (!merge_status.ok()) { if (!merge_status.ok()) {
// Failed to merge! // Failed to merge!

@ -471,7 +471,8 @@ void GetContext::Merge(const Slice* value) {
const Status s = MergeHelper::TimedFullMerge( const Status s = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, value, merge_context_->GetOperands(), merge_operator_, user_key_, value, merge_context_->GetOperands(),
pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_, logger_, pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_, logger_,
statistics_, clock_); statistics_, clock_, /* result_operand */ nullptr,
/* update_num_ops_stats */ true);
if (!s.ok()) { if (!s.ok()) {
state_ = kCorrupt; state_ = kCorrupt;
return; return;
@ -489,7 +490,7 @@ void GetContext::MergeWithEntity(Slice entity) {
const Status s = MergeHelper::TimedFullMergeWithEntity( const Status s = MergeHelper::TimedFullMergeWithEntity(
merge_operator_, user_key_, entity, merge_context_->GetOperands(), merge_operator_, user_key_, entity, merge_context_->GetOperands(),
pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_, logger_, pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_, logger_,
statistics_, clock_); statistics_, clock_, /* update_num_ops_stats */ true);
if (!s.ok()) { if (!s.ok()) {
state_ = kCorrupt; state_ = kCorrupt;
return; return;

@ -664,22 +664,25 @@ Status WriteBatchWithIndexInternal::MergeKey(const Slice& key,
Statistics* statistics = immutable_db_options.statistics.get(); Statistics* statistics = immutable_db_options.statistics.get();
Logger* logger = immutable_db_options.info_log.get(); Logger* logger = immutable_db_options.info_log.get();
SystemClock* clock = immutable_db_options.clock; SystemClock* clock = immutable_db_options.clock;
return MergeHelper::TimedFullMerge(merge_operator, key, value, return MergeHelper::TimedFullMerge(
context.GetOperands(), result, logger, merge_operator, key, value, context.GetOperands(), result, logger,
statistics, clock); statistics, clock, /* result_operand */ nullptr,
/* update_num_ops_stats */ false);
} else if (db_options_ != nullptr) { } else if (db_options_ != nullptr) {
Statistics* statistics = db_options_->statistics.get(); Statistics* statistics = db_options_->statistics.get();
Env* env = db_options_->env; Env* env = db_options_->env;
Logger* logger = db_options_->info_log.get(); Logger* logger = db_options_->info_log.get();
SystemClock* clock = env->GetSystemClock().get(); SystemClock* clock = env->GetSystemClock().get();
return MergeHelper::TimedFullMerge(merge_operator, key, value, return MergeHelper::TimedFullMerge(
context.GetOperands(), result, logger, merge_operator, key, value, context.GetOperands(), result, logger,
statistics, clock); statistics, clock, /* result_operand */ nullptr,
/* update_num_ops_stats */ false);
} else { } else {
const auto cf_opts = cfh->cfd()->ioptions(); const auto cf_opts = cfh->cfd()->ioptions();
return MergeHelper::TimedFullMerge( return MergeHelper::TimedFullMerge(
merge_operator, key, value, context.GetOperands(), result, merge_operator, key, value, context.GetOperands(), result,
cf_opts->logger, cf_opts->stats, cf_opts->clock); cf_opts->logger, cf_opts->stats, cf_opts->clock,
/* result_operand */ nullptr, /* update_num_ops_stats */ false);
} }
} else { } else {
return Status::InvalidArgument("Must provide a column_family"); return Status::InvalidArgument("Must provide a column_family");

Loading…
Cancel
Save