Fail early when `merge_operator` not configured (#7667)

Summary:
An application may accidentally write merge operands without properly configuring `merge_operator`. We should alert them as early as possible that there's an API misuse. Previously RocksDB only notified them when a query or background operation needed to merge but couldn't. With this PR, RocksDB notifies them of the problem before applying the merge operand to the memtable (although it may already be in WAL, which seems it'd cause a crash loop until they enable `merge_operator`).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7667

Reviewed By: riversand963

Differential Revision: D24933360

Pulled By: ajkr

fbshipit-source-id: 3a4a2ceb0b7aed184113dd03b8efd735a8332f7f
main
Andrew Kryczka 4 years ago committed by Facebook GitHub Bot
parent 7582c5682b
commit 1c5f13f2a5
  1. 4
      HISTORY.md
  2. 4
      db/write_batch.cc
  3. 39
      db/write_batch_test.cc
  4. 2
      utilities/transactions/optimistic_transaction_test.cc

@ -1,4 +1,8 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased
### Behavior Changes
* Attempting to write a merge operand without explicitly configuring `merge_operator` now fails immediately, causing the DB to enter read-only mode. Previously, failure was deferred until the `merge_operator` was needed by a user read or a background operation.
## 6.15.0 (11/13/2020) ## 6.15.0 (11/13/2020)
### Bug Fixes ### Bug Fixes
* Fixed a bug in the following combination of features: indexes with user keys (`format_version >= 3`), indexes are partitioned (`index_type == kTwoLevelIndexSearch`), and some index partitions are pinned in memory (`BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache`). The bug could cause keys to be truncated when read from the index leading to wrong read results or other unexpected behavior. * Fixed a bug in the following combination of features: indexes with user keys (`format_version >= 3`), indexes are partitioned (`index_type == kTwoLevelIndexSearch`), and some index partitions are pinned in memory (`BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache`). The bug could cause keys to be truncated when read from the index leading to wrong read results or other unexpected behavior.

@ -1715,6 +1715,10 @@ class MemTableInserter : public WriteBatch::Handler {
MemTable* mem = cf_mems_->GetMemTable(); MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetImmutableMemTableOptions(); auto* moptions = mem->GetImmutableMemTableOptions();
if (moptions->merge_operator == nullptr) {
return Status::InvalidArgument(
"Merge requires `ColumnFamilyOptions::merge_operator != nullptr`");
}
bool perform_merge = false; bool perform_merge = false;
assert(!concurrent_memtable_writes_ || assert(!concurrent_memtable_writes_ ||
moptions->max_successive_merges == 0); moptions->max_successive_merges == 0);

@ -7,12 +7,13 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "rocksdb/db.h"
#include <memory> #include <memory>
#include "db/column_family.h" #include "db/column_family.h"
#include "db/db_test_util.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/memtablerep.h" #include "rocksdb/memtablerep.h"
#include "rocksdb/utilities/write_batch_with_index.h" #include "rocksdb/utilities/write_batch_with_index.h"
@ -23,11 +24,15 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
static std::string PrintContents(WriteBatch* b) { static std::string PrintContents(WriteBatch* b,
bool merge_operator_supported = true) {
InternalKeyComparator cmp(BytewiseComparator()); InternalKeyComparator cmp(BytewiseComparator());
auto factory = std::make_shared<SkipListFactory>(); auto factory = std::make_shared<SkipListFactory>();
Options options; Options options;
options.memtable_factory = factory; options.memtable_factory = factory;
if (merge_operator_supported) {
options.merge_operator.reset(new TestPutOperator());
}
ImmutableCFOptions ioptions(options); ImmutableCFOptions ioptions(options);
WriteBufferManager wb(options.db_write_buffer_size); WriteBufferManager wb(options.db_write_buffer_size);
MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb, MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
@ -113,15 +118,17 @@ static std::string PrintContents(WriteBatch* b) {
state.append(NumberToString(ikey.sequence)); state.append(NumberToString(ikey.sequence));
} }
} }
EXPECT_EQ(b->HasPut(), put_count > 0); if (s.ok()) {
EXPECT_EQ(b->HasDelete(), delete_count > 0); EXPECT_EQ(b->HasPut(), put_count > 0);
EXPECT_EQ(b->HasSingleDelete(), single_delete_count > 0); EXPECT_EQ(b->HasDelete(), delete_count > 0);
EXPECT_EQ(b->HasDeleteRange(), delete_range_count > 0); EXPECT_EQ(b->HasSingleDelete(), single_delete_count > 0);
EXPECT_EQ(b->HasMerge(), merge_count > 0); EXPECT_EQ(b->HasDeleteRange(), delete_range_count > 0);
if (!s.ok()) { EXPECT_EQ(b->HasMerge(), merge_count > 0);
if (count != WriteBatchInternal::Count(b)) {
state.append("CountMismatch()");
}
} else {
state.append(s.ToString()); state.append(s.ToString());
} else if (count != WriteBatchInternal::Count(b)) {
state.append("CountMismatch()");
} }
delete mem->Unref(); delete mem->Unref();
return state; return state;
@ -354,6 +361,16 @@ TEST_F(WriteBatchTest, MergeNotImplemented) {
ASSERT_OK(batch.Iterate(&handler)); ASSERT_OK(batch.Iterate(&handler));
} }
TEST_F(WriteBatchTest, MergeWithoutOperatorInsertionFailure) {
WriteBatch batch;
ASSERT_OK(batch.Merge(Slice("foo"), Slice("bar")));
ASSERT_EQ(1u, batch.Count());
ASSERT_EQ(
"Invalid argument: Merge requires `ColumnFamilyOptions::merge_operator "
"!= nullptr`",
PrintContents(&batch, false /* merge_operator_supported */));
}
TEST_F(WriteBatchTest, Blob) { TEST_F(WriteBatchTest, Blob) {
WriteBatch batch; WriteBatch batch;
ASSERT_OK(batch.Put(Slice("k1"), Slice("v1"))); ASSERT_OK(batch.Put(Slice("k1"), Slice("v1")));

@ -10,6 +10,7 @@
#include <thread> #include <thread>
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/db_test_util.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/perf_context.h" #include "rocksdb/perf_context.h"
@ -37,6 +38,7 @@ class OptimisticTransactionTest
options.create_if_missing = true; options.create_if_missing = true;
options.max_write_buffer_number = 2; options.max_write_buffer_number = 2;
options.max_write_buffer_size_to_maintain = 1600; options.max_write_buffer_size_to_maintain = 1600;
options.merge_operator.reset(new TestPutOperator());
dbname = test::PerThreadDBPath("optimistic_transaction_testdb"); dbname = test::PerThreadDBPath("optimistic_transaction_testdb");
DestroyDB(dbname, options); DestroyDB(dbname, options);

Loading…
Cancel
Save