Limit number of merge operands in Cassandra merge operator

Summary:
Now that RocksDB supports conditional merging during point lookups (introduced in #2923), Cassandra value merge operator can be updated to pass in a limit. The limit needs to be passed in from the Cassandra code.
Closes https://github.com/facebook/rocksdb/pull/2947

Differential Revision: D5938454

Pulled By: sagar0

fbshipit-source-id: d64a72d53170d8cf202b53bd648475c3952f7d7f
main
Sagar Vemuri 7 years ago committed by Facebook Github Bot
parent cf51d3eb73
commit bb38cd03a9
  1. 7
      java/rocksjni/cassandra_value_operator.cc
  2. 9
      java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java
  3. 11
      utilities/cassandra/merge_operator.h

@ -23,13 +23,14 @@
/* /*
* Class: org_rocksdb_CassandraValueMergeOperator * Class: org_rocksdb_CassandraValueMergeOperator
* Method: newSharedCassandraValueMergeOperator * Method: newSharedCassandraValueMergeOperator
* Signature: (I)J * Signature: (II)J
*/ */
jlong Java_org_rocksdb_CassandraValueMergeOperator_newSharedCassandraValueMergeOperator( jlong Java_org_rocksdb_CassandraValueMergeOperator_newSharedCassandraValueMergeOperator(
JNIEnv* env, jclass jclazz, jint gcGracePeriodInSeconds) { JNIEnv* env, jclass jclazz, jint gcGracePeriodInSeconds,
jint operands_limit) {
auto* op = new std::shared_ptr<rocksdb::MergeOperator>( auto* op = new std::shared_ptr<rocksdb::MergeOperator>(
new rocksdb::cassandra::CassandraValueMergeOperator( new rocksdb::cassandra::CassandraValueMergeOperator(
gcGracePeriodInSeconds)); gcGracePeriodInSeconds, operands_limit));
return reinterpret_cast<jlong>(op); return reinterpret_cast<jlong>(op);
} }

@ -11,10 +11,15 @@ package org.rocksdb;
*/ */
public class CassandraValueMergeOperator extends MergeOperator { public class CassandraValueMergeOperator extends MergeOperator {
public CassandraValueMergeOperator(int gcGracePeriodInSeconds) { public CassandraValueMergeOperator(int gcGracePeriodInSeconds) {
super(newSharedCassandraValueMergeOperator(gcGracePeriodInSeconds)); super(newSharedCassandraValueMergeOperator(gcGracePeriodInSeconds, 0));
} }
private native static long newSharedCassandraValueMergeOperator(int gcGracePeriodInSeconds); public CassandraValueMergeOperator(int gcGracePeriodInSeconds, int operandsLimit) {
super(newSharedCassandraValueMergeOperator(gcGracePeriodInSeconds, operandsLimit));
}
private native static long newSharedCassandraValueMergeOperator(
int gcGracePeriodInSeconds, int limit);
@Override protected final native void disposeInternal(final long handle); @Override protected final native void disposeInternal(final long handle);
} }

@ -15,8 +15,10 @@ namespace cassandra {
*/ */
class CassandraValueMergeOperator : public MergeOperator { class CassandraValueMergeOperator : public MergeOperator {
public: public:
explicit CassandraValueMergeOperator(int32_t gc_grace_period_in_seconds) explicit CassandraValueMergeOperator(int32_t gc_grace_period_in_seconds,
: gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {} size_t operands_limit = 0)
: gc_grace_period_in_seconds_(gc_grace_period_in_seconds),
operands_limit_(operands_limit) {}
virtual bool FullMergeV2(const MergeOperationInput& merge_in, virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override; MergeOperationOutput* merge_out) const override;
@ -30,8 +32,13 @@ public:
virtual bool AllowSingleOperand() const override { return true; } virtual bool AllowSingleOperand() const override { return true; }
virtual bool ShouldMerge(const std::vector<Slice>& operands) const override {
return operands_limit_ > 0 && operands.size() >= operands_limit_;
}
private: private:
int32_t gc_grace_period_in_seconds_; int32_t gc_grace_period_in_seconds_;
size_t operands_limit_;
}; };
} // namespace cassandra } // namespace cassandra
} // namespace rocksdb } // namespace rocksdb

Loading…
Cancel
Save